diff options
Diffstat (limited to 'silx/utils/testutils.py')
-rwxr-xr-x[-rw-r--r--] | silx/utils/testutils.py | 73 |
1 files changed, 57 insertions, 16 deletions
diff --git a/silx/utils/testutils.py b/silx/utils/testutils.py index 82c2ce3..1252269 100644..100755 --- a/silx/utils/testutils.py +++ b/silx/utils/testutils.py @@ -141,7 +141,7 @@ class TestLogging(logging.Handler): self.records = [] - self.count_by_level = { + self.expected_count_by_level = { logging.CRITICAL: critical, logging.ERROR: error, logging.WARNING: warning, @@ -150,6 +150,9 @@ class TestLogging(logging.Handler): logging.NOTSET: notset } + self._expected_count = sum([v for k, v in self.expected_count_by_level.items() if v is not None]) + """Amount of any logging expected""" + super(TestLogging, self).__init__() def __enter__(self): @@ -160,27 +163,65 @@ class TestLogging(logging.Handler): # ensure no log message is ignored self.entry_level = self.logger.level * 1 self.logger.setLevel(logging.DEBUG) + self.entry_disabled = self.logger.disabled + self.logger.disabled = False + return self + + def can_be_checked(self): + """Returns True if this listener have received enough messages to + be valid, and then checked. + + This can be useful for asynchronous wait of messages. It allows process + an early break, instead of waiting much time in an active loop. + """ + return len(self.records) >= self._expected_count + + def get_count_by_level(self): + """Returns the current message count by level. + """ + count = { + logging.CRITICAL: 0, + logging.ERROR: 0, + logging.WARNING: 0, + logging.INFO: 0, + logging.DEBUG: 0, + logging.NOTSET: 0 + } + for record in self.records: + level = record.levelno + if level in count: + count[level] = count[level] + 1 + return count def __exit__(self, exc_type, exc_value, traceback): """Context (i.e., with) support""" self.logger.removeHandler(self) self.logger.propagate = True self.logger.setLevel(self.entry_level) - - for level, expected_count in self.count_by_level.items(): - if expected_count is None: - continue - - # Number of records for the specified level_str - count = len([r for r in self.records if r.levelno == level]) - if count != expected_count: # That's an error - # Resend record logs through logger as they where masked - # to help debug - for record in self.records: - self.logger.handle(record) - raise RuntimeError( - 'Expected %d %s logging messages, got %d' % ( - expected_count, logging.getLevelName(level), count)) + self.logger.disabled = self.entry_disabled + + count_by_level = self.get_count_by_level() + + # Remove keys which does not matter + ignored = [r for r, v in self.expected_count_by_level.items() if v is None] + expected_count_by_level = dict(self.expected_count_by_level) + for i in ignored: + del count_by_level[i] + del expected_count_by_level[i] + + if count_by_level != expected_count_by_level: + # Re-send record logs through logger as they where masked + # to help debug + message = "" + for level in count_by_level.keys(): + if message != "": + message += ", " + count = count_by_level[level] + expected_count = expected_count_by_level[level] + message += "%d %s (got %d)" % (expected_count, logging.getLevelName(level), count) + + raise RuntimeError( + 'Expected %s' % message) def emit(self, record): """Override :meth:`logging.Handler.emit`""" |