# -*- coding: utf-8 -*- # Copyright 2018 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from synapse.storage._base import SQLBaseStore logger = logging.getLogger(__name__) class StateDeltasStore(SQLBaseStore): def get_current_state_deltas(self, prev_stream_id): """Fetch a list of room state changes since the given stream id Each entry in the result contains the following fields: - stream_id (int) - room_id (str) - type (str): event type - state_key (str): - event_id (str|None): new event_id for this state key. None if the state has been deleted. - prev_event_id (str|None): previous event_id for this state key. None if it's new state. Args: prev_stream_id (int): point to get changes since (exclusive) Returns: Deferred[list[dict]]: results """ prev_stream_id = int(prev_stream_id) if not self._curr_state_delta_stream_cache.has_any_entity_changed( prev_stream_id ): return [] def get_current_state_deltas_txn(txn): # First we calculate the max stream id that will give us less than # N results. # We arbitarily limit to 100 stream_id entries to ensure we don't # select toooo many. sql = """ SELECT stream_id, count(*) FROM current_state_delta_stream WHERE stream_id > ? GROUP BY stream_id ORDER BY stream_id ASC LIMIT 100 """ txn.execute(sql, (prev_stream_id,)) total = 0 max_stream_id = prev_stream_id for max_stream_id, count in txn: total += count if total > 100: # We arbitarily limit to 100 entries to ensure we don't # select toooo many. break # Now actually get the deltas sql = """ SELECT stream_id, room_id, type, state_key, event_id, prev_event_id FROM current_state_delta_stream WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC """ txn.execute(sql, (prev_stream_id, max_stream_id)) return self.cursor_to_dict(txn) return self.runInteraction( "get_current_state_deltas", get_current_state_deltas_txn ) def _get_max_stream_id_in_current_state_deltas_txn(self, txn): return self._simple_select_one_onecol_txn( txn, table="current_state_delta_stream", keyvalues={}, retcol="COALESCE(MAX(stream_id), -1)", ) def get_max_stream_id_in_current_state_deltas(self): return self.runInteraction( "get_max_stream_id_in_current_state_deltas", self._get_max_stream_id_in_current_state_deltas_txn, )