diff options
Diffstat (limited to 'silx/io/test/test_dictdump.py')
-rw-r--r-- | silx/io/test/test_dictdump.py | 411 |
1 files changed, 397 insertions, 14 deletions
diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py index b99116b..93c9183 100644 --- a/silx/io/test/test_dictdump.py +++ b/silx/io/test/test_dictdump.py @@ -33,6 +33,7 @@ import os import tempfile import unittest import h5py +from copy import deepcopy from collections import defaultdict @@ -72,7 +73,63 @@ link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset") link_attrs["links"]["external_link"] = h5py.ExternalLink(ext_filename, "/ext_group/dataset") -class TestDictToH5(unittest.TestCase): +class DictTestCase(unittest.TestCase): + + def assertRecursiveEqual(self, expected, actual, nodes=tuple()): + err_msg = "\n\n Tree nodes: {}".format(nodes) + if isinstance(expected, dict): + self.assertTrue(isinstance(actual, dict), msg=err_msg) + self.assertEqual( + set(expected.keys()), + set(actual.keys()), + msg=err_msg + ) + for k in actual: + self.assertRecursiveEqual( + expected[k], + actual[k], + nodes=nodes + (k,), + ) + return + if isinstance(actual, numpy.ndarray): + actual = actual.tolist() + if isinstance(expected, numpy.ndarray): + expected = expected.tolist() + + self.assertEqual(expected, actual, msg=err_msg) + + +class H5DictTestCase(DictTestCase): + + def _dictRoundTripNormalize(self, treedict): + """Convert the dictionary as expected from a round-trip + treedict -> dicttoh5 -> h5todict -> newtreedict + """ + for key, value in list(treedict.items()): + if isinstance(value, dict): + self._dictRoundTripNormalize(value) + + # Expand treedict[("group", "attr_name")] + # to treedict["group"]["attr_name"] + for key, value in list(treedict.items()): + if not isinstance(key, tuple): + continue + # Put the attribute inside the group + grpname, attr = key + if not grpname: + continue + group = treedict.setdefault(grpname, dict()) + if isinstance(group, dict): + del treedict[key] + group[("", attr)] = value + + def dictRoundTripNormalize(self, treedict): + treedict2 = deepcopy(treedict) + self._dictRoundTripNormalize(treedict2) + return treedict2 + + +class TestDictToH5(H5DictTestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") @@ -110,14 +167,13 @@ class TestDictToH5(unittest.TestCase): min(ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"]), 5.7196) - def testH5Overwrite(self): + def testH5OverwriteDeprecatedApi(self): dd = ConfigDict({'t': True}) dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a') dd = ConfigDict({'t': False}) - with TestLogging(dictdump_logger, warning=1): - dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a', - overwrite_data=False) + dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a', + overwrite_data=False) res = h5todict(self.h5_fname) assert(res['t'] == True) @@ -200,8 +256,7 @@ class TestDictToH5(unittest.TestCase): ("group", "attr"): 10, } with h5py.File(self.h5_fname, "w") as h5file: - with TestLogging(dictdump_logger, warning=1): - dictdump.dicttoh5(ddict, h5file) + dictdump.dicttoh5(ddict, h5file) self.assertEqual(h5file["group"].attrs['attr'], 10) def testFlatDict(self): @@ -241,8 +296,223 @@ class TestDictToH5(unittest.TestCase): numpy.testing.assert_array_equal(h5py_read_dataset(h5file["darks"]["0"]), ddict['darks']['0']) - -class TestH5ToDict(unittest.TestCase): + def testOverwrite(self): + # Tree structure that will be tested + group1 = { + ("", "attr2"): "original2", + "dset1": 0, + "dset2": [0, 1], + ("dset1", "attr1"): "original1", + ("dset1", "attr2"): "original2", + ("dset2", "attr1"): "original1", + ("dset2", "attr2"): "original2", + } + group2 = { + "subgroup1": group1.copy(), + "subgroup2": group1.copy(), + ("subgroup1", "attr1"): "original1", + ("subgroup2", "attr1"): "original1" + } + group2.update(group1) + # initial HDF5 tree + otreedict = { + ('', 'attr1'): "original1", + ('', 'attr2'): "original2", + 'group1': group1, + 'group2': group2, + ('group1', 'attr1'): "original1", + ('group2', 'attr1'): "original1" + } + wtreedict = None # dumped dictionary + etreedict = None # expected HDF5 tree after dump + + def reset_file(): + dicttoh5( + otreedict, + h5file=self.h5_fname, + mode="w", + ) + + def append_file(update_mode): + dicttoh5( + wtreedict, + h5file=self.h5_fname, + mode="a", + update_mode=update_mode + ) + + def assert_file(): + rtreedict = h5todict( + self.h5_fname, + include_attributes=True, + asarray=False + ) + netreedict = self.dictRoundTripNormalize(etreedict) + try: + self.assertRecursiveEqual(netreedict, rtreedict) + except AssertionError: + from pprint import pprint + print("\nDUMP:") + pprint(wtreedict) + print("\nEXPECTED:") + pprint(netreedict) + print("\nHDF5:") + pprint(rtreedict) + raise + + def assert_append(update_mode): + append_file(update_mode) + assert_file() + + # Test wrong arguments + with self.assertRaises(ValueError): + dicttoh5( + otreedict, + h5file=self.h5_fname, + mode="w", + update_mode="wrong-value" + ) + + # No writing + reset_file() + etreedict = deepcopy(otreedict) + assert_file() + + # Write identical dictionary + wtreedict = deepcopy(otreedict) + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add", "modify", "replace"]: + assert_append(update_mode) + + # Write empty dictionary + wtreedict = dict() + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add", "modify", "replace"]: + assert_append(update_mode) + + # Modified dataset + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = dict() + wtreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]} + wtreedict["group2"]["subgroup2"]["dset2"] = [10, 20] + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + etreedict["group2"]["subgroup2"]["dset2"] = [10, 20] + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = dict() + etreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]} + etreedict["group2"]["subgroup2"]["dset2"] = [10, 20] + assert_append("replace") + + # Modified group + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = [0, 1] + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add", "modify"]: + assert_append(update_mode) + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = [0, 1] + assert_append("replace") + + # Modified attribute + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = dict() + wtreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified" + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + etreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified" + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = dict() + etreedict["group2"]["subgroup2"]["dset1"] = dict() + etreedict["group2"]["subgroup2"]["dset1"][("", "attr1")] = "modified" + assert_append("replace") + + # Delete group + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = None + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + del etreedict["group2"]["subgroup2"] + del etreedict["group2"][("subgroup2", "attr1")] + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + assert_append("replace") + + # Delete dataset + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = dict() + wtreedict["group2"]["subgroup2"]["dset2"] = None + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + del etreedict["group2"]["subgroup2"]["dset2"] + del etreedict["group2"]["subgroup2"][("dset2", "attr1")] + del etreedict["group2"]["subgroup2"][("dset2", "attr2")] + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = dict() + assert_append("replace") + + # Delete attribute + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = dict() + wtreedict["group2"]["subgroup2"][("dset2", "attr1")] = None + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + del etreedict["group2"]["subgroup2"][("dset2", "attr1")] + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = dict() + etreedict["group2"]["subgroup2"]["dset2"] = dict() + assert_append("replace") + + +class TestH5ToDict(H5DictTestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") @@ -313,7 +583,7 @@ class TestH5ToDict(unittest.TestCase): numpy.testing.assert_array_equal(ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")]) -class TestDictToNx(unittest.TestCase): +class TestDictToNx(H5DictTestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() self.h5_fname = os.path.join(self.tempdir, "nx.h5") @@ -416,8 +686,121 @@ class TestDictToNx(unittest.TestCase): with h5py.File(self.h5_fname, "r") as h5file: self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10) - -class TestNxToDict(unittest.TestCase): + def testOverwrite(self): + entry_name = "entry" + wtreedict = { + "group1": {"a": 1, "b": 2}, + "group2@attr3": "attr3", + "group2@attr4": "attr4", + "group2": { + "@attr1": "attr1", + "@attr2": "attr2", + "c": 3, + "d": 4, + "dataset4": 8, + "dataset4@units": "keV", + }, + "group3": {"subgroup": {"e": 9, "f": 10}}, + "dataset1": 5, + "dataset2": 6, + "dataset3": 7, + "dataset3@units": "mm", + } + esubtree = { + "@NX_class": "NXentry", + "group1": {"@NX_class": "NXcollection", "a": 1, "b": 2}, + "group2": { + "@NX_class": "NXcollection", + "@attr1": "attr1", + "@attr2": "attr2", + "@attr3": "attr3", + "@attr4": "attr4", + "c": 3, + "d": 4, + "dataset4": 8, + "dataset4@units": "keV", + }, + "group3": { + "@NX_class": "NXcollection", + "subgroup": {"@NX_class": "NXcollection", "e": 9, "f": 10}, + }, + "dataset1": 5, + "dataset2": 6, + "dataset3": 7, + "dataset3@units": "mm", + } + etreedict = {entry_name: esubtree} + + def append_file(update_mode, add_nx_class): + dictdump.dicttonx( + wtreedict, + h5file=self.h5_fname, + mode="a", + h5path=entry_name, + update_mode=update_mode, + add_nx_class=add_nx_class + ) + + def assert_file(): + rtreedict = dictdump.nxtodict( + self.h5_fname, + include_attributes=True, + asarray=False, + ) + netreedict = self.dictRoundTripNormalize(etreedict) + try: + self.assertRecursiveEqual(netreedict, rtreedict) + except AssertionError: + from pprint import pprint + print("\nDUMP:") + pprint(wtreedict) + print("\nEXPECTED:") + pprint(netreedict) + print("\nHDF5:") + pprint(rtreedict) + raise + + def assert_append(update_mode, add_nx_class=None): + append_file(update_mode, add_nx_class=add_nx_class) + assert_file() + + # First to an empty file + assert_append(None) + + # Add non-existing attributes/datasets/groups + wtreedict["group1"].pop("a") + wtreedict["group2"].pop("@attr1") + wtreedict["group2"]["@attr2"] = "attr3" # only for update + wtreedict["group2"]["@type"] = "test" + wtreedict["group2"]["dataset4"] = 9 # only for update + del wtreedict["group2"]["dataset4@units"] + wtreedict["group3"] = {} + esubtree["group2"]["@type"] = "test" + assert_append("add") + + # Add update existing attributes and datasets + esubtree["group2"]["@attr2"] = "attr3" + esubtree["group2"]["dataset4"] = 9 + assert_append("modify") + + # Do not add missing NX_class by default when updating + wtreedict["group2"]["@NX_class"] = "NXprocess" + esubtree["group2"]["@NX_class"] = "NXprocess" + assert_append("modify") + del wtreedict["group2"]["@NX_class"] + assert_append("modify") + + # Overwrite existing groups/datasets/attributes + esubtree["group1"].pop("a") + esubtree["group2"].pop("@attr1") + esubtree["group2"]["@NX_class"] = "NXcollection" + esubtree["group2"]["dataset4"] = 9 + del esubtree["group2"]["dataset4@units"] + esubtree["group3"] = {"@NX_class": "NXcollection"} + assert_append("replace", add_nx_class=True) + + +class TestNxToDict(H5DictTestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() self.h5_fname = os.path.join(self.tempdir, "nx.h5") @@ -510,7 +893,7 @@ class TestNxToDict(unittest.TestCase): h5todict(self.h5_fname, path="/Mars", errors='raise') -class TestDictToJson(unittest.TestCase): +class TestDictToJson(DictTestCase): def setUp(self): self.dir_path = tempfile.mkdtemp() self.json_fname = os.path.join(self.dir_path, "cityattrs.json") @@ -528,7 +911,7 @@ class TestDictToJson(unittest.TestCase): self.assertIn('"inhabitants": 160215', json_content) -class TestDictToIni(unittest.TestCase): +class TestDictToIni(DictTestCase): def setUp(self): self.dir_path = tempfile.mkdtemp() self.ini_fname = os.path.join(self.dir_path, "test.ini") |