summaryrefslogtreecommitdiff
path: root/src/etcd/lock.py
blob: 687a548f3eaa68e464658a77d87bd70e64e4b085 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import logging
import etcd
import uuid

_log = logging.getLogger(__name__)

class Lock(object):
    """
    Locking recipe for etcd, inspired by the kazoo recipe for zookeeper
    """

    def __init__(self, client, lock_name):
        self.client = client
        self.name = lock_name
        # props to Netflix Curator for this trick. It is possible for our
        # create request to succeed on the server, but for a failure to
        # prevent us from getting back the full path name. We prefix our
        # lock name with a uuid and can check for its presence on retry.
        self._uuid = uuid.uuid4().hex
        self.path = "/_locks/{}".format(lock_name)
        self.is_taken = False
        self._sequence = None
        _log.debug("Initiating lock for %s with uuid %s", self.path, self._uuid)

    @property
    def uuid(self):
        """
        The unique id of the lock
        """
        return self._uuid

    @uuid.setter
    def set_uuid(self, value):
        old_uuid = self._uuid
        self._uuid = value
        if not self._find_lock():
            _log.warn("The hand-set uuid was not found, refusing")
            self._uuid = old_uuid
            raise ValueError("Inexistent UUID")

    @property
    def is_acquired(self):
        """
        tells us if the lock is acquired
        """
        if not self.is_taken:
            _log.debug("Lock not taken")
            return False
        try:
            self.client.read(self.lock_key)
            return True
        except etcd.EtcdKeyNotFound:
            _log.warn("Lock was supposedly taken, but we cannot find it")
            self.is_taken = False
            return False

    def acquire(self, blocking=True, lock_ttl=3600, timeout=None):
        """
        Acquire the lock.

        :param blocking Block until the lock is obtained, or timeout is reached
        :param lock_ttl The duration of the lock we acquired, set to None for eternal locks
        :param timeout The time to wait before giving up on getting a lock
        """
        # First of all try to write, if our lock is not present.
        if not self._find_lock():
            _log.debug("Lock not found, writing it to %s", self.path)
            res = self.client.write(self.path, self.uuid, ttl=lock_ttl, append=True)
            self._set_sequence(res.key)
            _log.debug("Lock key %s written, sequence is %s", res.key, self._sequence)
        elif lock_ttl:
            # Renew our lock if already here!
            self.client.write(self.lock_key, self.uuid, ttl=lock_ttl)

        # now get the owner of the lock, and the next lowest sequence
        return self._acquired(blocking=blocking, timeout=timeout)

    def release(self):
        """
        Release the lock
        """
        if not self._sequence:
            self._find_lock()
        try:
            _log.debug("Releasing existing lock %s", self.lock_key)
            self.client.delete(self.lock_key)
        except etcd.EtcdKeyNotFound:
            _log.info("Lock %s not found, nothing to release", self.lock_key)
            pass
        finally:
            self.is_taken = False

    def __enter__(self):
        """
        You can use the lock as a contextmanager
        """
        self.acquire(blocking=True, lock_ttl=0)

    def __exit__(self, type, value, traceback):
        self.release()

    def _acquired(self, blocking=True, timeout=0):
        locker, nearest = self._get_locker()
        self.is_taken = False
        if self.lock_key == locker:
            _log.debug("Lock acquired!")
            # We own the lock, yay!
            self.is_taken = True
            return True
        else:
            self.is_taken = False
            if not blocking:
                return False
            # Let's look for the lock
            watch_key = nearest
            _log.debug("Lock not acquired, now watching %s", watch_key)
            t = max(0, timeout)
            while True:
                try:
                    r = self.client.watch(watch_key, timeout=t)
                    _log.debug("Detected variation for %s: %s", r.key, r.action)
                    return self._acquired(blocking=True, timeout=timeout)
                except etcd.EtcdKeyNotFound:
                    _log.debug("Key %s not present anymore, moving on", watch_key)
                    return self._acquired(blocking=True, timeout=timeout)
                except etcd.EtcdException:
                    # TODO: log something...
                    pass

    @property
    def lock_key(self):
        if not self._sequence:
            raise ValueError("No sequence present.")
        return self.path + '/' + str(self._sequence)

    def _set_sequence(self, key):
        self._sequence = key.replace(self.path, '').lstrip('/')

    def _find_lock(self):
        if self._sequence:
            try:
                res = self.client.read(self.lock_key)
                self._uuid = res.value
                return True
            except etcd.EtcdKeyNotFound:
                return False
        elif self._uuid:
            try:
                for r in self.client.read(self.path, recursive=True).leaves:
                    if r.value == self._uuid:
                        self._set_sequence(r.key)
                        return True
            except etcd.EtcdKeyNotFound:
                pass
        return False

    def _get_locker(self):
        results = [res for res in
                   self.client.read(self.path, recursive=True).leaves]
        if not self._sequence:
            self._find_lock()
        l = sorted([r.key for r in results])
        _log.debug("Lock keys found: %s", l)
        try:
            i = l.index(self.lock_key)
            if i == 0:
                _log.debug("No key before our one, we are the locker")
                return (l[0], None)
            else:
                _log.debug("Locker: %s, key to watch: %s", l[0], l[i-1])
                return (l[0], l[i-1])
        except ValueError:
            # Something very wrong is going on, most probably
            # our lock has expired
            raise etcd.EtcdLockExpired(u"Lock not found")