summaryrefslogtreecommitdiff
path: root/silx/gui/widgets/test/test_threadpoolpushbutton.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/gui/widgets/test/test_threadpoolpushbutton.py')
-rw-r--r--silx/gui/widgets/test/test_threadpoolpushbutton.py24
1 files changed, 15 insertions, 9 deletions
diff --git a/silx/gui/widgets/test/test_threadpoolpushbutton.py b/silx/gui/widgets/test/test_threadpoolpushbutton.py
index 126f8f3..a8618a4 100644
--- a/silx/gui/widgets/test/test_threadpoolpushbutton.py
+++ b/silx/gui/widgets/test/test_threadpoolpushbutton.py
@@ -26,7 +26,7 @@
__authors__ = ["V. Valls"]
__license__ = "MIT"
-__date__ = "15/12/2016"
+__date__ = "17/01/2018"
import unittest
@@ -35,7 +35,7 @@ from silx.gui import qt
from silx.gui.test.utils import TestCaseQt
from silx.gui.test.utils import SignalListener
from silx.gui.widgets.ThreadPoolPushButton import ThreadPoolPushButton
-from silx.test.utils import TestLogging
+from silx.utils.testutils import TestLogging
class TestThreadPoolPushButton(TestCaseQt):
@@ -44,6 +44,14 @@ class TestThreadPoolPushButton(TestCaseQt):
super(TestThreadPoolPushButton, self).setUp()
self._result = []
+ def waitForPendingOperations(self, object):
+ for i in range(50):
+ if not object.hasPendingOperations():
+ break
+ self.qWait(10)
+ else:
+ raise RuntimeError("Still waiting for a pending operation")
+
def _trace(self, name, delay=0):
self._result.append(name)
if delay != 0:
@@ -61,27 +69,25 @@ class TestThreadPoolPushButton(TestCaseQt):
button.executeCallable()
time.sleep(0.1)
self.assertListEqual(self._result, ["a"])
- self.qapp.processEvents()
+ self.waitForPendingOperations(button)
def testMultiExecution(self):
button = ThreadPoolPushButton()
button.setCallable(self._trace, "a", 0)
- number = qt.QThreadPool.globalInstance().maxThreadCount() * 2
+ number = qt.silxGlobalThreadPool().maxThreadCount()
for _ in range(number):
button.executeCallable()
- time.sleep(number * 0.01 + 0.1)
+ self.waitForPendingOperations(button)
self.assertListEqual(self._result, ["a"] * number)
- self.qapp.processEvents()
def testSaturateThreadPool(self):
button = ThreadPoolPushButton()
button.setCallable(self._trace, "a", 100)
- number = qt.QThreadPool.globalInstance().maxThreadCount() * 2
+ number = qt.silxGlobalThreadPool().maxThreadCount() * 2
for _ in range(number):
button.executeCallable()
- time.sleep(number * 0.1 + 0.1)
+ self.waitForPendingOperations(button)
self.assertListEqual(self._result, ["a"] * number)
- self.qapp.processEvents()
def testSuccess(self):
listener = SignalListener()