summaryrefslogtreecommitdiff
path: root/taurus/lib/taurus/core/util/test/test_timer.py
blob: 9ae1898353d0b04816efcbf124e9e06b4df055da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python

#############################################################################
##
## This file is part of Taurus
## 
## http://taurus-scada.org
##
## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
## 
## Taurus is free software: you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
## 
## Taurus is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU Lesser General Public License for more details.
## 
## You should have received a copy of the GNU Lesser General Public License
## along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
##
#############################################################################

"""Test for taurus.core.util.timer"""

#__all__ = []

__docformat__ = 'restructuredtext'

import time
import threading
import numpy
from taurus.external import unittest
from taurus.core.util.timer import Timer

class TimerTest(unittest.TestCase):
    '''Test case for testing the taurus.core.util.timer.Timer class'''

    def setUp(self):
        unittest.TestCase.setUp(self)
        self.__count = 0
        self.__calltimes = []

    def tearDown(self):
        self.__count = 0
        self.__calltimes = []

    def test_calltimes(self):
        '''check the quality of the Timer's timing'''
        period = .1 # in s
        n = 10 # make this >2 ! 
        timeout = n * period + 2 # expected time +2 seconds margin
        tol = 0.001 # time tolerance for the various checks (in s)
        
        timer = Timer(period, self._callback, None, strict_timing=True,
                            sleep=.05, n=n)
        
        # Start the timer, wait till the callback is called n times and then s
        # and then stop it
        self.__nCalls = threading.Event()
        timer.start()
        self.__nCalls.wait(timeout) 
        timer.stop()
        self.__nCalls.clear()
        
        # checking number of calls
        ts = numpy.array(self.__calltimes)
        msg = '%i calls expected (got %i)' % (n , ts.size)
        self.assertEqual(ts.size, n, msg)
        
        # checking drift
        totaltime = ts[-1] - ts[0]
        drift = abs(totaltime - ((n-1) * period))
        msg = 'Too much drift (%g). Tolerance=%g' % (drift, tol)
        self.assertLess(drift, tol, msg)
        
        # checking period jitter (mean period and its standard dev)
        periods = numpy.diff(ts)
        mean = periods.mean()
        std = periods.std()
        msg = 'Wrong period. Expected: %g +/- %g Got: %g +/- %g' % (period, tol,
                                                                    mean, std)
        self.assertAlmostEqual(mean, period, msg=msg, delta=tol)
        self.assertLess(std, tol, msg)
        
    def _callback(self, sleep=0, n=5):
        '''store times at which it has been called, and signal when n calls
        have been done. If sleep>0 is passed, sleep that much in each call '''
        self.__calltimes.append(time.time())
        self.__count += 1
        time.sleep(sleep)
        if self.__count == n:
            self.__nCalls.set() # signal that we have been called n times 
        

if __name__ == '__main__':
    pass