summaryrefslogtreecommitdiff
path: root/silx/io/test/test_dictdump.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/test/test_dictdump.py')
-rw-r--r--silx/io/test/test_dictdump.py411
1 files changed, 397 insertions, 14 deletions
diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py
index b99116b..93c9183 100644
--- a/silx/io/test/test_dictdump.py
+++ b/silx/io/test/test_dictdump.py
@@ -33,6 +33,7 @@ import os
import tempfile
import unittest
import h5py
+from copy import deepcopy
from collections import defaultdict
@@ -72,7 +73,63 @@ link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset")
link_attrs["links"]["external_link"] = h5py.ExternalLink(ext_filename, "/ext_group/dataset")
-class TestDictToH5(unittest.TestCase):
+class DictTestCase(unittest.TestCase):
+
+ def assertRecursiveEqual(self, expected, actual, nodes=tuple()):
+ err_msg = "\n\n Tree nodes: {}".format(nodes)
+ if isinstance(expected, dict):
+ self.assertTrue(isinstance(actual, dict), msg=err_msg)
+ self.assertEqual(
+ set(expected.keys()),
+ set(actual.keys()),
+ msg=err_msg
+ )
+ for k in actual:
+ self.assertRecursiveEqual(
+ expected[k],
+ actual[k],
+ nodes=nodes + (k,),
+ )
+ return
+ if isinstance(actual, numpy.ndarray):
+ actual = actual.tolist()
+ if isinstance(expected, numpy.ndarray):
+ expected = expected.tolist()
+
+ self.assertEqual(expected, actual, msg=err_msg)
+
+
+class H5DictTestCase(DictTestCase):
+
+ def _dictRoundTripNormalize(self, treedict):
+ """Convert the dictionary as expected from a round-trip
+ treedict -> dicttoh5 -> h5todict -> newtreedict
+ """
+ for key, value in list(treedict.items()):
+ if isinstance(value, dict):
+ self._dictRoundTripNormalize(value)
+
+ # Expand treedict[("group", "attr_name")]
+ # to treedict["group"]["attr_name"]
+ for key, value in list(treedict.items()):
+ if not isinstance(key, tuple):
+ continue
+ # Put the attribute inside the group
+ grpname, attr = key
+ if not grpname:
+ continue
+ group = treedict.setdefault(grpname, dict())
+ if isinstance(group, dict):
+ del treedict[key]
+ group[("", attr)] = value
+
+ def dictRoundTripNormalize(self, treedict):
+ treedict2 = deepcopy(treedict)
+ self._dictRoundTripNormalize(treedict2)
+ return treedict2
+
+
+class TestDictToH5(H5DictTestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
@@ -110,14 +167,13 @@ class TestDictToH5(unittest.TestCase):
min(ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"]),
5.7196)
- def testH5Overwrite(self):
+ def testH5OverwriteDeprecatedApi(self):
dd = ConfigDict({'t': True})
dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a')
dd = ConfigDict({'t': False})
- with TestLogging(dictdump_logger, warning=1):
- dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a',
- overwrite_data=False)
+ dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a',
+ overwrite_data=False)
res = h5todict(self.h5_fname)
assert(res['t'] == True)
@@ -200,8 +256,7 @@ class TestDictToH5(unittest.TestCase):
("group", "attr"): 10,
}
with h5py.File(self.h5_fname, "w") as h5file:
- with TestLogging(dictdump_logger, warning=1):
- dictdump.dicttoh5(ddict, h5file)
+ dictdump.dicttoh5(ddict, h5file)
self.assertEqual(h5file["group"].attrs['attr'], 10)
def testFlatDict(self):
@@ -241,8 +296,223 @@ class TestDictToH5(unittest.TestCase):
numpy.testing.assert_array_equal(h5py_read_dataset(h5file["darks"]["0"]),
ddict['darks']['0'])
-
-class TestH5ToDict(unittest.TestCase):
+ def testOverwrite(self):
+ # Tree structure that will be tested
+ group1 = {
+ ("", "attr2"): "original2",
+ "dset1": 0,
+ "dset2": [0, 1],
+ ("dset1", "attr1"): "original1",
+ ("dset1", "attr2"): "original2",
+ ("dset2", "attr1"): "original1",
+ ("dset2", "attr2"): "original2",
+ }
+ group2 = {
+ "subgroup1": group1.copy(),
+ "subgroup2": group1.copy(),
+ ("subgroup1", "attr1"): "original1",
+ ("subgroup2", "attr1"): "original1"
+ }
+ group2.update(group1)
+ # initial HDF5 tree
+ otreedict = {
+ ('', 'attr1'): "original1",
+ ('', 'attr2'): "original2",
+ 'group1': group1,
+ 'group2': group2,
+ ('group1', 'attr1'): "original1",
+ ('group2', 'attr1'): "original1"
+ }
+ wtreedict = None # dumped dictionary
+ etreedict = None # expected HDF5 tree after dump
+
+ def reset_file():
+ dicttoh5(
+ otreedict,
+ h5file=self.h5_fname,
+ mode="w",
+ )
+
+ def append_file(update_mode):
+ dicttoh5(
+ wtreedict,
+ h5file=self.h5_fname,
+ mode="a",
+ update_mode=update_mode
+ )
+
+ def assert_file():
+ rtreedict = h5todict(
+ self.h5_fname,
+ include_attributes=True,
+ asarray=False
+ )
+ netreedict = self.dictRoundTripNormalize(etreedict)
+ try:
+ self.assertRecursiveEqual(netreedict, rtreedict)
+ except AssertionError:
+ from pprint import pprint
+ print("\nDUMP:")
+ pprint(wtreedict)
+ print("\nEXPECTED:")
+ pprint(netreedict)
+ print("\nHDF5:")
+ pprint(rtreedict)
+ raise
+
+ def assert_append(update_mode):
+ append_file(update_mode)
+ assert_file()
+
+ # Test wrong arguments
+ with self.assertRaises(ValueError):
+ dicttoh5(
+ otreedict,
+ h5file=self.h5_fname,
+ mode="w",
+ update_mode="wrong-value"
+ )
+
+ # No writing
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ assert_file()
+
+ # Write identical dictionary
+ wtreedict = deepcopy(otreedict)
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify", "replace"]:
+ assert_append(update_mode)
+
+ # Write empty dictionary
+ wtreedict = dict()
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify", "replace"]:
+ assert_append(update_mode)
+
+ # Modified dataset
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]}
+ wtreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ etreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]}
+ etreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+ assert_append("replace")
+
+ # Modified group
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = [0, 1]
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify"]:
+ assert_append(update_mode)
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = [0, 1]
+ assert_append("replace")
+
+ # Modified attribute
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified"
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ etreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified"
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"][("", "attr1")] = "modified"
+ assert_append("replace")
+
+ # Delete group
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"]
+ del etreedict["group2"][("subgroup2", "attr1")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ assert_append("replace")
+
+ # Delete dataset
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"]["dset2"] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"]["dset2"]
+ del etreedict["group2"]["subgroup2"][("dset2", "attr1")]
+ del etreedict["group2"]["subgroup2"][("dset2", "attr2")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ assert_append("replace")
+
+ # Delete attribute
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"][("dset2", "attr1")] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"][("dset2", "attr1")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset2"] = dict()
+ assert_append("replace")
+
+
+class TestH5ToDict(H5DictTestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
@@ -313,7 +583,7 @@ class TestH5ToDict(unittest.TestCase):
numpy.testing.assert_array_equal(ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")])
-class TestDictToNx(unittest.TestCase):
+class TestDictToNx(H5DictTestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.h5_fname = os.path.join(self.tempdir, "nx.h5")
@@ -416,8 +686,121 @@ class TestDictToNx(unittest.TestCase):
with h5py.File(self.h5_fname, "r") as h5file:
self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10)
-
-class TestNxToDict(unittest.TestCase):
+ def testOverwrite(self):
+ entry_name = "entry"
+ wtreedict = {
+ "group1": {"a": 1, "b": 2},
+ "group2@attr3": "attr3",
+ "group2@attr4": "attr4",
+ "group2": {
+ "@attr1": "attr1",
+ "@attr2": "attr2",
+ "c": 3,
+ "d": 4,
+ "dataset4": 8,
+ "dataset4@units": "keV",
+ },
+ "group3": {"subgroup": {"e": 9, "f": 10}},
+ "dataset1": 5,
+ "dataset2": 6,
+ "dataset3": 7,
+ "dataset3@units": "mm",
+ }
+ esubtree = {
+ "@NX_class": "NXentry",
+ "group1": {"@NX_class": "NXcollection", "a": 1, "b": 2},
+ "group2": {
+ "@NX_class": "NXcollection",
+ "@attr1": "attr1",
+ "@attr2": "attr2",
+ "@attr3": "attr3",
+ "@attr4": "attr4",
+ "c": 3,
+ "d": 4,
+ "dataset4": 8,
+ "dataset4@units": "keV",
+ },
+ "group3": {
+ "@NX_class": "NXcollection",
+ "subgroup": {"@NX_class": "NXcollection", "e": 9, "f": 10},
+ },
+ "dataset1": 5,
+ "dataset2": 6,
+ "dataset3": 7,
+ "dataset3@units": "mm",
+ }
+ etreedict = {entry_name: esubtree}
+
+ def append_file(update_mode, add_nx_class):
+ dictdump.dicttonx(
+ wtreedict,
+ h5file=self.h5_fname,
+ mode="a",
+ h5path=entry_name,
+ update_mode=update_mode,
+ add_nx_class=add_nx_class
+ )
+
+ def assert_file():
+ rtreedict = dictdump.nxtodict(
+ self.h5_fname,
+ include_attributes=True,
+ asarray=False,
+ )
+ netreedict = self.dictRoundTripNormalize(etreedict)
+ try:
+ self.assertRecursiveEqual(netreedict, rtreedict)
+ except AssertionError:
+ from pprint import pprint
+ print("\nDUMP:")
+ pprint(wtreedict)
+ print("\nEXPECTED:")
+ pprint(netreedict)
+ print("\nHDF5:")
+ pprint(rtreedict)
+ raise
+
+ def assert_append(update_mode, add_nx_class=None):
+ append_file(update_mode, add_nx_class=add_nx_class)
+ assert_file()
+
+ # First to an empty file
+ assert_append(None)
+
+ # Add non-existing attributes/datasets/groups
+ wtreedict["group1"].pop("a")
+ wtreedict["group2"].pop("@attr1")
+ wtreedict["group2"]["@attr2"] = "attr3" # only for update
+ wtreedict["group2"]["@type"] = "test"
+ wtreedict["group2"]["dataset4"] = 9 # only for update
+ del wtreedict["group2"]["dataset4@units"]
+ wtreedict["group3"] = {}
+ esubtree["group2"]["@type"] = "test"
+ assert_append("add")
+
+ # Add update existing attributes and datasets
+ esubtree["group2"]["@attr2"] = "attr3"
+ esubtree["group2"]["dataset4"] = 9
+ assert_append("modify")
+
+ # Do not add missing NX_class by default when updating
+ wtreedict["group2"]["@NX_class"] = "NXprocess"
+ esubtree["group2"]["@NX_class"] = "NXprocess"
+ assert_append("modify")
+ del wtreedict["group2"]["@NX_class"]
+ assert_append("modify")
+
+ # Overwrite existing groups/datasets/attributes
+ esubtree["group1"].pop("a")
+ esubtree["group2"].pop("@attr1")
+ esubtree["group2"]["@NX_class"] = "NXcollection"
+ esubtree["group2"]["dataset4"] = 9
+ del esubtree["group2"]["dataset4@units"]
+ esubtree["group3"] = {"@NX_class": "NXcollection"}
+ assert_append("replace", add_nx_class=True)
+
+
+class TestNxToDict(H5DictTestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.h5_fname = os.path.join(self.tempdir, "nx.h5")
@@ -510,7 +893,7 @@ class TestNxToDict(unittest.TestCase):
h5todict(self.h5_fname, path="/Mars", errors='raise')
-class TestDictToJson(unittest.TestCase):
+class TestDictToJson(DictTestCase):
def setUp(self):
self.dir_path = tempfile.mkdtemp()
self.json_fname = os.path.join(self.dir_path, "cityattrs.json")
@@ -528,7 +911,7 @@ class TestDictToJson(unittest.TestCase):
self.assertIn('"inhabitants": 160215', json_content)
-class TestDictToIni(unittest.TestCase):
+class TestDictToIni(DictTestCase):
def setUp(self):
self.dir_path = tempfile.mkdtemp()
self.ini_fname = os.path.join(self.dir_path, "test.ini")