summaryrefslogtreecommitdiff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py130
1 files changed, 92 insertions, 38 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3a5c6ee4..52334300 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from _base import SQLBaseStore, _RollbackButIsFineException
+from ._base import SQLBaseStore, _RollbackButIsFineException
from twisted.internet import defer, reactor
@@ -75,8 +75,8 @@ class EventsStore(SQLBaseStore):
yield stream_orderings
stream_ordering_manager = stream_ordering_manager()
else:
- stream_ordering_manager = yield self._stream_id_gen.get_next_mult(
- self, len(events_and_contexts)
+ stream_ordering_manager = self._stream_id_gen.get_next_mult(
+ len(events_and_contexts)
)
with stream_ordering_manager as stream_orderings:
@@ -101,37 +101,23 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context, backfilled=False,
+ def persist_event(self, event, context,
is_new_state=True, current_state=None):
- stream_ordering = None
- if backfilled:
- self.min_stream_token -= 1
- stream_ordering = self.min_stream_token
-
- if stream_ordering is None:
- stream_ordering_manager = yield self._stream_id_gen.get_next(self)
- else:
- @contextmanager
- def stream_ordering_manager():
- yield stream_ordering
- stream_ordering_manager = stream_ordering_manager()
-
try:
- with stream_ordering_manager as stream_ordering:
+ with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
- backfilled=backfilled,
is_new_state=is_new_state,
current_state=current_state,
)
except _RollbackButIsFineException:
pass
- max_persisted_id = yield self._stream_id_gen.get_max_token(self)
+ max_persisted_id = yield self._stream_id_gen.get_max_token()
defer.returnValue((stream_ordering, max_persisted_id))
@defer.inlineCallbacks
@@ -165,13 +151,38 @@ class EventsStore(SQLBaseStore):
defer.returnValue(events[0] if events else None)
+ @defer.inlineCallbacks
+ def get_events(self, event_ids, check_redacted=True,
+ get_prev_content=False, allow_rejected=False):
+ """Get events from the database
+
+ Args:
+ event_ids (list): The event_ids of the events to fetch
+ check_redacted (bool): If True, check if event has been redacted
+ and redact it.
+ get_prev_content (bool): If True and event is a state event,
+ include the previous states content in the unsigned field.
+ allow_rejected (bool): If True return rejected events.
+
+ Returns:
+ Deferred : Dict from event_id to event.
+ """
+ events = yield self._get_events(
+ event_ids,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
+
+ defer.returnValue({e.event_id: e for e in events})
+
@log_function
- def _persist_event_txn(self, txn, event, context, backfilled,
+ def _persist_event_txn(self, txn, event, context,
is_new_state=True, current_state=None):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
- txn.call_after(self.get_current_state_for_key.invalidate_all)
+ txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
@@ -198,7 +209,7 @@ class EventsStore(SQLBaseStore):
return self._persist_events_txn(
txn,
[(event, context)],
- backfilled=backfilled,
+ backfilled=False,
is_new_state=is_new_state,
)
@@ -455,7 +466,7 @@ class EventsStore(SQLBaseStore):
for event, _ in state_events_and_contexts:
if not context.rejected:
txn.call_after(
- self.get_current_state_for_key.invalidate,
+ self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,)
)
@@ -526,6 +537,9 @@ class EventsStore(SQLBaseStore):
if not event_ids:
defer.returnValue([])
+ event_id_list = event_ids
+ event_ids = set(event_ids)
+
event_map = self._get_events_from_cache(
event_ids,
check_redacted=check_redacted,
@@ -535,23 +549,18 @@ class EventsStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_map]
- if not missing_events_ids:
- defer.returnValue([
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ])
-
- missing_events = yield self._enqueue_events(
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ if missing_events_ids:
+ missing_events = yield self._enqueue_events(
+ missing_events_ids,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
- event_map.update(missing_events)
+ event_map.update(missing_events)
defer.returnValue([
- event_map[e_id] for e_id in event_ids
+ event_map[e_id] for e_id in event_id_list
if e_id in event_map and event_map[e_id]
])
@@ -1064,3 +1073,48 @@ class EventsStore(SQLBaseStore):
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
defer.returnValue(result)
+
+ def get_current_backfill_token(self):
+ """The current minimum token that backfilled events have reached"""
+
+ # TODO: Fix race with the persit_event txn by using one of the
+ # stream id managers
+ return -self.min_stream_token
+
+ def get_all_new_events(self, last_backfill_id, last_forward_id,
+ current_backfill_id, current_forward_id, limit):
+ """Get all the new events that have arrived at the server either as
+ new events or as backfilled events"""
+ def get_all_new_events_txn(txn):
+ sql = (
+ "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
+ " FROM events as e"
+ " JOIN event_json as ej"
+ " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+ " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ " LIMIT ?"
+ )
+ if last_forward_id != current_forward_id:
+ txn.execute(sql, (last_forward_id, current_forward_id, limit))
+ new_forward_events = txn.fetchall()
+ else:
+ new_forward_events = []
+
+ sql = (
+ "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
+ " FROM events as e"
+ " JOIN event_json as ej"
+ " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+ " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
+ " ORDER BY e.stream_ordering DESC"
+ " LIMIT ?"
+ )
+ if last_backfill_id != current_backfill_id:
+ txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
+ new_backfill_events = txn.fetchall()
+ else:
+ new_backfill_events = []
+
+ return (new_forward_events, new_backfill_events)
+ return self.runInteraction("get_all_new_events", get_all_new_events_txn)