diff options
Diffstat (limited to 'silx/gui/hdf5/test/test_hdf5.py')
-rw-r--r-- | silx/gui/hdf5/test/test_hdf5.py | 256 |
1 files changed, 171 insertions, 85 deletions
diff --git a/silx/gui/hdf5/test/test_hdf5.py b/silx/gui/hdf5/test/test_hdf5.py index 44c4456..fc27f6b 100644 --- a/silx/gui/hdf5/test/test_hdf5.py +++ b/silx/gui/hdf5/test/test_hdf5.py @@ -26,7 +26,7 @@ __authors__ = ["V. Valls"] __license__ = "MIT" -__date__ = "20/02/2018" +__date__ = "03/05/2018" import time @@ -39,6 +39,7 @@ from contextlib import contextmanager from silx.gui import qt from silx.gui.test.utils import TestCaseQt from silx.gui import hdf5 +from silx.gui.test.utils import SignalListener from silx.io import commonh5 import weakref @@ -48,6 +49,29 @@ except ImportError: h5py = None +_tmpDirectory = None + + +def setUpModule(): + global _tmpDirectory + _tmpDirectory = tempfile.mkdtemp(prefix=__name__) + + if h5py is not None: + filename = _tmpDirectory + "/data.h5" + + # create h5 data + f = h5py.File(filename, "w") + g = f.create_group("arrays") + g.create_dataset("scalar", data=10) + f.close() + + +def tearDownModule(): + global _tmpDirectory + shutil.rmtree(_tmpDirectory) + _tmpDirectory = None + + _called = 0 @@ -71,7 +95,7 @@ class TestHdf5TreeModel(TestCaseQt): self.skipTest("h5py is not available") def waitForPendingOperations(self, model): - for i in range(10): + for _ in range(10): if not model.hasPendingOperations(): break self.qWait(10) @@ -97,53 +121,53 @@ class TestHdf5TreeModel(TestCaseQt): self.assertIsNotNone(model) def testAppendFilename(self): - with self.h5TempFile() as filename: + filename = _tmpDirectory + "/data.h5" + model = hdf5.Hdf5TreeModel() + self.assertEquals(model.rowCount(qt.QModelIndex()), 0) + model.appendFile(filename) + self.assertEquals(model.rowCount(qt.QModelIndex()), 1) + # clean up + index = model.index(0, 0, qt.QModelIndex()) + h5File = model.data(index, hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE) + ref = weakref.ref(model) + model = None + self.qWaitForDestroy(ref) + + def testAppendBadFilename(self): + model = hdf5.Hdf5TreeModel() + self.assertRaises(IOError, model.appendFile, "#%$") + + def testInsertFilename(self): + filename = _tmpDirectory + "/data.h5" + try: model = hdf5.Hdf5TreeModel() self.assertEquals(model.rowCount(qt.QModelIndex()), 0) - model.appendFile(filename) + model.insertFile(filename) self.assertEquals(model.rowCount(qt.QModelIndex()), 1) # clean up index = model.index(0, 0, qt.QModelIndex()) h5File = model.data(index, hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE) + self.assertIsNotNone(h5File) + finally: ref = weakref.ref(model) model = None self.qWaitForDestroy(ref) - def testAppendBadFilename(self): - model = hdf5.Hdf5TreeModel() - self.assertRaises(IOError, model.appendFile, "#%$") - - def testInsertFilename(self): - with self.h5TempFile() as filename: - try: - model = hdf5.Hdf5TreeModel() - self.assertEquals(model.rowCount(qt.QModelIndex()), 0) - model.insertFile(filename) - self.assertEquals(model.rowCount(qt.QModelIndex()), 1) - # clean up - index = model.index(0, 0, qt.QModelIndex()) - h5File = model.data(index, hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE) - self.assertIsNotNone(h5File) - finally: - ref = weakref.ref(model) - model = None - self.qWaitForDestroy(ref) - def testInsertFilenameAsync(self): - with self.h5TempFile() as filename: - try: - model = hdf5.Hdf5TreeModel() - self.assertEquals(model.rowCount(qt.QModelIndex()), 0) - model.insertFileAsync(filename) - index = model.index(0, 0, qt.QModelIndex()) - self.assertIsInstance(model.nodeFromIndex(index), hdf5.Hdf5LoadingItem.Hdf5LoadingItem) - self.waitForPendingOperations(model) - index = model.index(0, 0, qt.QModelIndex()) - self.assertIsInstance(model.nodeFromIndex(index), hdf5.Hdf5Item.Hdf5Item) - finally: - ref = weakref.ref(model) - model = None - self.qWaitForDestroy(ref) + filename = _tmpDirectory + "/data.h5" + try: + model = hdf5.Hdf5TreeModel() + self.assertEquals(model.rowCount(qt.QModelIndex()), 0) + model.insertFileAsync(filename) + index = model.index(0, 0, qt.QModelIndex()) + self.assertIsInstance(model.nodeFromIndex(index), hdf5.Hdf5LoadingItem.Hdf5LoadingItem) + self.waitForPendingOperations(model) + index = model.index(0, 0, qt.QModelIndex()) + self.assertIsInstance(model.nodeFromIndex(index), hdf5.Hdf5Item.Hdf5Item) + finally: + ref = weakref.ref(model) + model = None + self.qWaitForDestroy(ref) def testInsertObject(self): h5 = commonh5.File("/foo/bar/1.mock", "w") @@ -162,36 +186,37 @@ class TestHdf5TreeModel(TestCaseQt): self.assertEquals(model.rowCount(qt.QModelIndex()), 0) def testSynchronizeObject(self): - with self.h5TempFile() as filename: - h5 = h5py.File(filename) - model = hdf5.Hdf5TreeModel() - model.insertH5pyObject(h5) - self.assertEquals(model.rowCount(qt.QModelIndex()), 1) - index = model.index(0, 0, qt.QModelIndex()) - node1 = model.nodeFromIndex(index) - model.synchronizeH5pyObject(h5) - # Now h5 was loaded from it's filename - # Another ref is owned by the model - h5.close() + filename = _tmpDirectory + "/data.h5" + h5 = h5py.File(filename) + model = hdf5.Hdf5TreeModel() + model.insertH5pyObject(h5) + self.assertEquals(model.rowCount(qt.QModelIndex()), 1) + index = model.index(0, 0, qt.QModelIndex()) + node1 = model.nodeFromIndex(index) + model.synchronizeH5pyObject(h5) + self.waitForPendingOperations(model) + # Now h5 was loaded from it's filename + # Another ref is owned by the model + h5.close() - index = model.index(0, 0, qt.QModelIndex()) - node2 = model.nodeFromIndex(index) - self.assertIsNot(node1, node2) - # after sync - time.sleep(0.1) - self.qapp.processEvents() - time.sleep(0.1) - index = model.index(0, 0, qt.QModelIndex()) - self.assertIsInstance(model.nodeFromIndex(index), hdf5.Hdf5Item.Hdf5Item) - # clean up - index = model.index(0, 0, qt.QModelIndex()) - h5File = model.data(index, hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE) - self.assertIsNotNone(h5File) - h5File = None - # delete the model - ref = weakref.ref(model) - model = None - self.qWaitForDestroy(ref) + index = model.index(0, 0, qt.QModelIndex()) + node2 = model.nodeFromIndex(index) + self.assertIsNot(node1, node2) + # after sync + time.sleep(0.1) + self.qapp.processEvents() + time.sleep(0.1) + index = model.index(0, 0, qt.QModelIndex()) + self.assertIsInstance(model.nodeFromIndex(index), hdf5.Hdf5Item.Hdf5Item) + # clean up + index = model.index(0, 0, qt.QModelIndex()) + h5File = model.data(index, hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE) + self.assertIsNotNone(h5File) + h5File = None + # delete the model + ref = weakref.ref(model) + model = None + self.qWaitForDestroy(ref) def testFileMoveState(self): model = hdf5.Hdf5TreeModel() @@ -222,24 +247,24 @@ class TestHdf5TreeModel(TestCaseQt): self.assertNotEquals(model.supportedDropActions(), 0) def testDropExternalFile(self): - with self.h5TempFile() as filename: - model = hdf5.Hdf5TreeModel() - mimeData = qt.QMimeData() - mimeData.setUrls([qt.QUrl.fromLocalFile(filename)]) - model.dropMimeData(mimeData, qt.Qt.CopyAction, 0, 0, qt.QModelIndex()) - self.assertEquals(model.rowCount(qt.QModelIndex()), 1) - # after sync - self.waitForPendingOperations(model) - index = model.index(0, 0, qt.QModelIndex()) - self.assertIsInstance(model.nodeFromIndex(index), hdf5.Hdf5Item.Hdf5Item) - # clean up - index = model.index(0, 0, qt.QModelIndex()) - h5File = model.data(index, role=hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE) - self.assertIsNotNone(h5File) - h5File = None - ref = weakref.ref(model) - model = None - self.qWaitForDestroy(ref) + filename = _tmpDirectory + "/data.h5" + model = hdf5.Hdf5TreeModel() + mimeData = qt.QMimeData() + mimeData.setUrls([qt.QUrl.fromLocalFile(filename)]) + model.dropMimeData(mimeData, qt.Qt.CopyAction, 0, 0, qt.QModelIndex()) + self.assertEquals(model.rowCount(qt.QModelIndex()), 1) + # after sync + self.waitForPendingOperations(model) + index = model.index(0, 0, qt.QModelIndex()) + self.assertIsInstance(model.nodeFromIndex(index), hdf5.Hdf5Item.Hdf5Item) + # clean up + index = model.index(0, 0, qt.QModelIndex()) + h5File = model.data(index, role=hdf5.Hdf5TreeModel.H5PY_OBJECT_ROLE) + self.assertIsNotNone(h5File) + h5File = None + ref = weakref.ref(model) + model = None + self.qWaitForDestroy(ref) def getRowDataAsDict(self, model, row): displayed = {} @@ -337,6 +362,66 @@ class TestHdf5TreeModel(TestCaseQt): self.assertEquals(index, qt.QModelIndex()) +class TestHdf5TreeModelSignals(TestCaseQt): + + def setUp(self): + TestCaseQt.setUp(self) + self.model = hdf5.Hdf5TreeModel() + filename = _tmpDirectory + "/data.h5" + self.h5 = h5py.File(filename) + self.model.insertH5pyObject(self.h5) + + self.listener = SignalListener() + self.model.sigH5pyObjectLoaded.connect(self.listener.partial(signal="loaded")) + self.model.sigH5pyObjectRemoved.connect(self.listener.partial(signal="removed")) + self.model.sigH5pyObjectSynchronized.connect(self.listener.partial(signal="synchronized")) + + def tearDown(self): + self.signals = None + ref = weakref.ref(self.model) + self.model = None + self.qWaitForDestroy(ref) + self.h5.close() + self.h5 = None + TestCaseQt.tearDown(self) + + def waitForPendingOperations(self, model): + for _ in range(10): + if not model.hasPendingOperations(): + break + self.qWait(10) + else: + raise RuntimeError("Still waiting for a pending operation") + + def testInsert(self): + filename = _tmpDirectory + "/data.h5" + h5 = h5py.File(filename) + self.model.insertH5pyObject(h5) + self.assertEquals(self.listener.callCount(), 0) + + def testLoaded(self): + filename = _tmpDirectory + "/data.h5" + self.model.insertFile(filename) + self.assertEquals(self.listener.callCount(), 1) + self.assertEquals(self.listener.karguments(argumentName="signal")[0], "loaded") + self.assertIsNot(self.listener.arguments(callIndex=0)[0], self.h5) + self.assertEquals(self.listener.arguments(callIndex=0)[0].filename, filename) + + def testRemoved(self): + self.model.removeH5pyObject(self.h5) + self.assertEquals(self.listener.callCount(), 1) + self.assertEquals(self.listener.karguments(argumentName="signal")[0], "removed") + self.assertIs(self.listener.arguments(callIndex=0)[0], self.h5) + + def testSynchonized(self): + self.model.synchronizeH5pyObject(self.h5) + self.waitForPendingOperations(self.model) + self.assertEquals(self.listener.callCount(), 1) + self.assertEquals(self.listener.karguments(argumentName="signal")[0], "synchronized") + self.assertIs(self.listener.arguments(callIndex=0)[0], self.h5) + self.assertIsNot(self.listener.arguments(callIndex=0)[1], self.h5) + + class TestNexusSortFilterProxyModel(TestCaseQt): def getChildNames(self, model, index): @@ -873,6 +958,7 @@ def suite(): test_suite = unittest.TestSuite() loadTests = unittest.defaultTestLoader.loadTestsFromTestCase test_suite.addTest(loadTests(TestHdf5TreeModel)) + test_suite.addTest(loadTests(TestHdf5TreeModelSignals)) test_suite.addTest(loadTests(TestNexusSortFilterProxyModel)) test_suite.addTest(loadTests(TestHdf5TreeView)) test_suite.addTest(loadTests(TestH5Node)) |