summaryrefslogtreecommitdiff
path: root/synapse/storage/util
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2016-02-10 16:22:15 +0000
committerErik Johnston <erikj@matrix.org>2016-02-10 16:22:15 +0000
commit8e514df559bc1fadadf6ead5178537d0352ae221 (patch)
treee9f18d4f3044cab76ab5ae119505ea651194db5e /synapse/storage/util
parent2c402214a9fe51677adf05424cca7918b91c7949 (diff)
Imported Upstream version 0.13.0
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/__init__.py2
-rw-r--r--synapse/storage/util/id_generators.py38
2 files changed, 11 insertions, 29 deletions
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py
index c488b10d..bfebb0f6 100644
--- a/synapse/storage/util/__init__.py
+++ b/synapse/storage/util/__init__.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index e956df62..5c522f4a 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -72,28 +72,24 @@ class StreamIdGenerator(object):
with stream_id_gen.get_next_txn(txn) as stream_id:
# ... persist event ...
"""
- def __init__(self, table, column):
+ def __init__(self, db_conn, table, column):
self.table = table
self.column = column
self._lock = threading.Lock()
- self._current_max = None
+ cur = db_conn.cursor()
+ self._current_max = self._get_or_compute_current_max(cur)
+ cur.close()
+
self._unfinished_ids = deque()
- @defer.inlineCallbacks
def get_next(self, store):
"""
Usage:
with yield stream_id_gen.get_next as stream_id:
# ... persist event ...
"""
- if not self._current_max:
- yield store.runInteraction(
- "_compute_current_max",
- self._get_or_compute_current_max,
- )
-
with self._lock:
self._current_max += 1
next_id = self._current_max
@@ -108,21 +104,14 @@ class StreamIdGenerator(object):
with self._lock:
self._unfinished_ids.remove(next_id)
- defer.returnValue(manager())
+ return manager()
- @defer.inlineCallbacks
def get_next_mult(self, store, n):
"""
Usage:
with yield stream_id_gen.get_next(store, n) as stream_ids:
# ... persist events ...
"""
- if not self._current_max:
- yield store.runInteraction(
- "_compute_current_max",
- self._get_or_compute_current_max,
- )
-
with self._lock:
next_ids = range(self._current_max + 1, self._current_max + n + 1)
self._current_max += n
@@ -139,24 +128,17 @@ class StreamIdGenerator(object):
for next_id in next_ids:
self._unfinished_ids.remove(next_id)
- defer.returnValue(manager())
+ return manager()
- @defer.inlineCallbacks
def get_max_token(self, store):
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
"""
- if not self._current_max:
- yield store.runInteraction(
- "_compute_current_max",
- self._get_or_compute_current_max,
- )
-
with self._lock:
if self._unfinished_ids:
- defer.returnValue(self._unfinished_ids[0] - 1)
+ return self._unfinished_ids[0] - 1
- defer.returnValue(self._current_max)
+ return self._current_max
def _get_or_compute_current_max(self, txn):
with self._lock: