diff options
author | Picca Frédéric-Emmanuel <picca@debian.org> | 2022-02-02 14:19:58 +0100 |
---|---|---|
committer | Picca Frédéric-Emmanuel <picca@debian.org> | 2022-02-02 14:19:58 +0100 |
commit | 4e774db12d5ebe7a20eded6dd434a289e27999e5 (patch) | |
tree | a9822974ba45196f1e3740995ab157d6eb214a04 /src/silx/gui/utils/test/test_async.py | |
parent | d3194b1a9c4404ba93afac43d97172ab24c57098 (diff) |
New upstream version 1.0.0+dfsg
Diffstat (limited to 'src/silx/gui/utils/test/test_async.py')
-rw-r--r-- | src/silx/gui/utils/test/test_async.py | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/src/silx/gui/utils/test/test_async.py b/src/silx/gui/utils/test/test_async.py new file mode 100644 index 0000000..7304ca9 --- /dev/null +++ b/src/silx/gui/utils/test/test_async.py @@ -0,0 +1,127 @@ +# coding: utf-8 +# /*########################################################################## +# +# Copyright (c) 2018 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ###########################################################################*/ +"""Test of async module.""" + +from __future__ import absolute_import + +__authors__ = ["T. Vincent"] +__license__ = "MIT" +__date__ = "09/03/2018" + + +import threading +import unittest + + +from concurrent.futures import wait +from silx.gui import qt +from silx.gui.utils.testutils import TestCaseQt + +from silx.gui.utils import concurrent + + +class TestSubmitToQtThread(TestCaseQt): + """Test submission of tasks to Qt main thread""" + + def setUp(self): + # Reset executor to test lazy-loading in different conditions + concurrent._executor = None + super(TestSubmitToQtThread, self).setUp() + + def _task(self, value1, value2): + return value1, value2 + + def _taskWithException(self, *args, **kwargs): + raise RuntimeError('task exception') + + def testFromMainThread(self): + """Call submitToQtMainThread from the main thread""" + value1, value2 = 0, 1 + future = concurrent.submitToQtMainThread(self._task, value1, value2=value2) + self.assertTrue(future.done()) + self.assertEqual(future.result(1), (value1, value2)) + self.assertIsNone(future.exception(1)) + + future = concurrent.submitToQtMainThread(self._taskWithException) + self.assertTrue(future.done()) + with self.assertRaises(RuntimeError): + future.result(1) + self.assertIsInstance(future.exception(1), RuntimeError) + + def _threadedTest(self): + """Function run in a thread for the tests""" + value1, value2 = 0, 1 + future = concurrent.submitToQtMainThread(self._task, value1, value2=value2) + + wait([future], 3) + + self.assertTrue(future.done()) + self.assertEqual(future.result(1), (value1, value2)) + self.assertIsNone(future.exception(1)) + + future = concurrent.submitToQtMainThread(self._taskWithException) + + wait([future], 3) + + self.assertTrue(future.done()) + with self.assertRaises(RuntimeError): + future.result(1) + self.assertIsInstance(future.exception(1), RuntimeError) + + def testFromPythonThread(self): + """Call submitToQtMainThread from a Python thread""" + thread = threading.Thread(target=self._threadedTest) + thread.start() + for i in range(100): # Loop over for 10 seconds + self.qapp.processEvents() + thread.join(0.1) + if not thread.is_alive(): + break + else: + self.fail(('Thread task still running')) + + def testFromQtThread(self): + """Call submitToQtMainThread from a Qt thread pool""" + class Runner(qt.QRunnable): + def __init__(self, fn): + super(Runner, self).__init__() + self._fn = fn + + def run(self): + self._fn() + + def autoDelete(self): + return True + + threadPool = qt.silxGlobalThreadPool() + runner = Runner(self._threadedTest) + threadPool.start(runner) + for i in range(100): # Loop over for 10 seconds + self.qapp.processEvents() + done = threadPool.waitForDone(100) + if done: + break + else: + self.fail('Thread pool task still running') |