summaryrefslogtreecommitdiff
path: root/src/etcd/tests/integration/test_election.py
blob: 46ea2bb047eacbc6ab5bb1698d590cf20a7776a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import etcd
from . import test_simple
import time
import unittest

class TestElection(test_simple.EtcdIntegrationTest):
    def setUp(self):
        self.client = etcd.Client(port=6001)

    def test_set_get_delete(self):
        e = self.client.election
        res = e.set('/mysql', name='foo.example.com', ttl=30)
        self.assertTrue(res != '')
        res = e.get('/mysql')
        self.assertEquals(res, 'foo.example.com')
        self.assertTrue(e.delete('/mysql', name='foo.example.com'))


    def test_set_invalid_ttl(self):
        self.assertRaises(ValueError, self.client.election.set, '/mysql', name='foo.example.com', ttl='ciao')

    def test_get_non_existing(self):
        """This is actually expected to fail. See https://github.com/coreos/etcd/issues/446"""
        self.assertRaises(etcd.EtcdException, self.client.election.get, '/foobar')

    def test_delete_non_existing(self):
        self.assertRaises(etcd.EtcdException, self.client.election.delete, '/foobar')

    def test_get_delete_after_ttl_expired_raises(self):
        e = self.client.election
        e.set('/mysql', name='foo', ttl=1)
        time.sleep(2)
        self.assertRaises(etcd.EtcdException, e.get, '/mysql')
        self.assertRaises(etcd.EtcdKeyNotFound, e.delete, '/mysql', name='foo')