summaryrefslogtreecommitdiff
path: root/silx/utils/testutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/utils/testutils.py')
-rwxr-xr-x[-rw-r--r--]silx/utils/testutils.py73
1 files changed, 57 insertions, 16 deletions
diff --git a/silx/utils/testutils.py b/silx/utils/testutils.py
index 82c2ce3..1252269 100644..100755
--- a/silx/utils/testutils.py
+++ b/silx/utils/testutils.py
@@ -141,7 +141,7 @@ class TestLogging(logging.Handler):
self.records = []
- self.count_by_level = {
+ self.expected_count_by_level = {
logging.CRITICAL: critical,
logging.ERROR: error,
logging.WARNING: warning,
@@ -150,6 +150,9 @@ class TestLogging(logging.Handler):
logging.NOTSET: notset
}
+ self._expected_count = sum([v for k, v in self.expected_count_by_level.items() if v is not None])
+ """Amount of any logging expected"""
+
super(TestLogging, self).__init__()
def __enter__(self):
@@ -160,27 +163,65 @@ class TestLogging(logging.Handler):
# ensure no log message is ignored
self.entry_level = self.logger.level * 1
self.logger.setLevel(logging.DEBUG)
+ self.entry_disabled = self.logger.disabled
+ self.logger.disabled = False
+ return self
+
+ def can_be_checked(self):
+ """Returns True if this listener have received enough messages to
+ be valid, and then checked.
+
+ This can be useful for asynchronous wait of messages. It allows process
+ an early break, instead of waiting much time in an active loop.
+ """
+ return len(self.records) >= self._expected_count
+
+ def get_count_by_level(self):
+ """Returns the current message count by level.
+ """
+ count = {
+ logging.CRITICAL: 0,
+ logging.ERROR: 0,
+ logging.WARNING: 0,
+ logging.INFO: 0,
+ logging.DEBUG: 0,
+ logging.NOTSET: 0
+ }
+ for record in self.records:
+ level = record.levelno
+ if level in count:
+ count[level] = count[level] + 1
+ return count
def __exit__(self, exc_type, exc_value, traceback):
"""Context (i.e., with) support"""
self.logger.removeHandler(self)
self.logger.propagate = True
self.logger.setLevel(self.entry_level)
-
- for level, expected_count in self.count_by_level.items():
- if expected_count is None:
- continue
-
- # Number of records for the specified level_str
- count = len([r for r in self.records if r.levelno == level])
- if count != expected_count: # That's an error
- # Resend record logs through logger as they where masked
- # to help debug
- for record in self.records:
- self.logger.handle(record)
- raise RuntimeError(
- 'Expected %d %s logging messages, got %d' % (
- expected_count, logging.getLevelName(level), count))
+ self.logger.disabled = self.entry_disabled
+
+ count_by_level = self.get_count_by_level()
+
+ # Remove keys which does not matter
+ ignored = [r for r, v in self.expected_count_by_level.items() if v is None]
+ expected_count_by_level = dict(self.expected_count_by_level)
+ for i in ignored:
+ del count_by_level[i]
+ del expected_count_by_level[i]
+
+ if count_by_level != expected_count_by_level:
+ # Re-send record logs through logger as they where masked
+ # to help debug
+ message = ""
+ for level in count_by_level.keys():
+ if message != "":
+ message += ", "
+ count = count_by_level[level]
+ expected_count = expected_count_by_level[level]
+ message += "%d %s (got %d)" % (expected_count, logging.getLevelName(level), count)
+
+ raise RuntimeError(
+ 'Expected %s' % message)
def emit(self, record):
"""Override :meth:`logging.Handler.emit`"""