diff options
Diffstat (limited to 'silx/gui/utils/test/test_async.py')
-rw-r--r-- | silx/gui/utils/test/test_async.py | 138 |
1 files changed, 0 insertions, 138 deletions
diff --git a/silx/gui/utils/test/test_async.py b/silx/gui/utils/test/test_async.py deleted file mode 100644 index dcfde1d..0000000 --- a/silx/gui/utils/test/test_async.py +++ /dev/null @@ -1,138 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# -# Copyright (c) 2018 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ###########################################################################*/ -"""Test of async module.""" - -from __future__ import absolute_import - -__authors__ = ["T. Vincent"] -__license__ = "MIT" -__date__ = "09/03/2018" - - -import threading -import unittest - - -from concurrent.futures import wait -from silx.gui import qt -from silx.gui.utils.testutils import TestCaseQt - -from silx.gui.utils import concurrent - - -class TestSubmitToQtThread(TestCaseQt): - """Test submission of tasks to Qt main thread""" - - def setUp(self): - # Reset executor to test lazy-loading in different conditions - concurrent._executor = None - super(TestSubmitToQtThread, self).setUp() - - def _task(self, value1, value2): - return value1, value2 - - def _taskWithException(self, *args, **kwargs): - raise RuntimeError('task exception') - - def testFromMainThread(self): - """Call submitToQtMainThread from the main thread""" - value1, value2 = 0, 1 - future = concurrent.submitToQtMainThread(self._task, value1, value2=value2) - self.assertTrue(future.done()) - self.assertEqual(future.result(1), (value1, value2)) - self.assertIsNone(future.exception(1)) - - future = concurrent.submitToQtMainThread(self._taskWithException) - self.assertTrue(future.done()) - with self.assertRaises(RuntimeError): - future.result(1) - self.assertIsInstance(future.exception(1), RuntimeError) - - def _threadedTest(self): - """Function run in a thread for the tests""" - value1, value2 = 0, 1 - future = concurrent.submitToQtMainThread(self._task, value1, value2=value2) - - wait([future], 3) - - self.assertTrue(future.done()) - self.assertEqual(future.result(1), (value1, value2)) - self.assertIsNone(future.exception(1)) - - future = concurrent.submitToQtMainThread(self._taskWithException) - - wait([future], 3) - - self.assertTrue(future.done()) - with self.assertRaises(RuntimeError): - future.result(1) - self.assertIsInstance(future.exception(1), RuntimeError) - - def testFromPythonThread(self): - """Call submitToQtMainThread from a Python thread""" - thread = threading.Thread(target=self._threadedTest) - thread.start() - for i in range(100): # Loop over for 10 seconds - self.qapp.processEvents() - thread.join(0.1) - if not thread.is_alive(): - break - else: - self.fail(('Thread task still running')) - - def testFromQtThread(self): - """Call submitToQtMainThread from a Qt thread pool""" - class Runner(qt.QRunnable): - def __init__(self, fn): - super(Runner, self).__init__() - self._fn = fn - - def run(self): - self._fn() - - def autoDelete(self): - return True - - threadPool = qt.silxGlobalThreadPool() - runner = Runner(self._threadedTest) - threadPool.start(runner) - for i in range(100): # Loop over for 10 seconds - self.qapp.processEvents() - done = threadPool.waitForDone(100) - if done: - break - else: - self.fail('Thread pool task still running') - - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase( - TestSubmitToQtThread)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest='suite') |