diff options
Diffstat (limited to 'tests/compare.py')
-rw-r--r-- | tests/compare.py | 98 |
1 files changed, 42 insertions, 56 deletions
diff --git a/tests/compare.py b/tests/compare.py index 56122aa..6a92fd1 100644 --- a/tests/compare.py +++ b/tests/compare.py @@ -24,7 +24,10 @@ import yaml import sys import tempfile -from sets import Set as set +try: + set +except NameError: + from sets import Set as set if os.path.exists('scripts/dtrx') and os.path.exists('tests'): os.chdir('tests') @@ -34,16 +37,10 @@ else: print "ERROR: Can't run tests in this directory!" sys.exit(2) -X_SCRIPT = os.path.realpath('../scripts/dtrx') +DTRX_SCRIPT = os.path.realpath('../scripts/dtrx') +SHELL_CMD = ['sh', '-se'] ROOT_DIR = os.path.realpath(os.curdir) OUTCOMES = ['error', 'failed', 'passed'] -TESTSCRIPT_NAME = 'testscript.sh' -SCRIPT_PROLOGUE = """#!/bin/sh -set -e -""" - -input_buffer = tempfile.TemporaryFile() -output_buffer = tempfile.TemporaryFile() class ExtractorTestError(Exception): pass @@ -62,25 +59,27 @@ class ExtractorTest(object): if isinstance(value, str): value = [value] setattr(self, key, value) - - def get_results(self, commands, stdin=None): - print >>output_buffer, "Output from %s:" % (' '.join(commands),) - output_buffer.flush() - status = subprocess.call(commands, stdout=output_buffer, - stderr=output_buffer, stdin=stdin) - process = subprocess.Popen(['find', '!', '-name', TESTSCRIPT_NAME], - stdout=subprocess.PIPE) - process.wait() + if self.input and (not self.input.endswith('\n')): + self.input = self.input + '\n' + + def start_proc(self, command, stdin=None, output=None): + process = subprocess.Popen(command, stdin=subprocess.PIPE, + stdout=output, stderr=output) + if stdin: + process.stdin.write(stdin) + process.stdin.close() + return process + + def get_results(self, command, stdin=None): + print >>self.outbuffer, "Output from %s:" % (' '.join(command),) + self.outbuffer.flush() + status = self.start_proc(command, stdin, self.outbuffer).wait() + process = subprocess.Popen(['find'], stdout=subprocess.PIPE) output = process.stdout.read(-1) process.stdout.close() + process.wait() return status, set(output.split('\n')) - def write_script(self, commands): - script = open(TESTSCRIPT_NAME, 'w') - script.write("%s%s\n" % (SCRIPT_PROLOGUE, commands)) - script.close() - subprocess.call(['chmod', 'u+w', TESTSCRIPT_NAME]) - def run_script(self, key): commands = getattr(self, key) if commands is not None: @@ -88,38 +87,27 @@ class ExtractorTest(object): directory_hint = '../' else: directory_hint = '' - self.write_script(commands) - subprocess.call(['sh', TESTSCRIPT_NAME, directory_hint]) + self.start_proc(SHELL_CMD + [directory_hint], commands) def get_shell_results(self): self.run_script('prerun') - self.write_script(self.baseline) - return self.get_results(['sh', TESTSCRIPT_NAME] + self.filenames) + return self.get_results(SHELL_CMD + self.filenames, self.baseline) def get_extractor_results(self): self.run_script('prerun') - input_buffer.seek(0, 0) - input_buffer.truncate() - if self.input: - input_buffer.write(self.input) - if not self.input.endswith('\n'): - input_buffer.write('\n') - input_buffer.seek(0, 0) - input_buffer.flush() - return self.get_results([X_SCRIPT] + self.options + self.filenames, - input_buffer) + return self.get_results([DTRX_SCRIPT] + self.options + self.filenames, + self.input) def get_posttest_result(self): if not self.posttest: return 0 - self.write_script(self.posttest) - return subprocess.call(['sh', TESTSCRIPT_NAME]) + return self.start_proc(SHELL_CMD, self.posttest).wait() def clean(self): self.run_script('cleanup') if self.directory: target = os.path.join(ROOT_DIR, self.directory) - extra_options = ['!', '-name', TESTSCRIPT_NAME] + extra_options = [] else: target = ROOT_DIR extra_options = ['(', '(', '-type', 'd', @@ -138,8 +126,8 @@ class ExtractorTest(object): def show_status(self, status, message=None): raw_status = status.lower() if raw_status != 'passed': - output_buffer.seek(0, 0) - sys.stdout.write(output_buffer.read(-1)) + self.outbuffer.seek(0, 0) + sys.stdout.write(self.outbuffer.read(-1)) if message is None: last_part = '' else: @@ -153,13 +141,13 @@ class ExtractorTest(object): status, expected = self.get_shell_results() self.clean() if expected != actual: - print >>output_buffer, "Only in baseline results:" - print >>output_buffer, '\n'.join(expected.difference(actual)) - print >>output_buffer, "Only in actual results:" - print >>output_buffer, '\n'.join(actual.difference(expected)) + print >>self.outbuffer, "Only in baseline results:" + print >>self.outbuffer, '\n'.join(expected.difference(actual)) + print >>self.outbuffer, "Only in actual results:" + print >>self.outbuffer, '\n'.join(actual.difference(expected)) return self.show_status('FAILED') elif posttest_result != 0: - print >>output_buffer, "Posttest gave status code", posttest_result + print >>self.outbuffer, "Posttest gave status code", posttest_result return self.show_status('FAILED') return self.show_status('Passed') @@ -187,24 +175,23 @@ class ExtractorTest(object): return None def check_results(self): - output_buffer.seek(0, 0) - output_buffer.truncate() self.clean() status, actual = self.get_extractor_results() - output_buffer.seek(0, 0) - output_buffer.readline() - output = output_buffer.read(-1) + self.outbuffer.seek(0, 0) + self.outbuffer.readline() + output = self.outbuffer.read(-1) problem = (self.have_error_mismatch(status) or self.check_output(output) or self.grep_output(output)) if problem: return self.show_status('FAILED', problem) - if self.baseline: + if self.baseline is not None: return self.compare_results(actual) else: self.clean() return self.show_status('Passed') def run(self): + self.outbuffer = tempfile.TemporaryFile() if self.directory: os.mkdir(self.directory) os.chdir(self.directory) @@ -212,6 +199,7 @@ class ExtractorTest(object): result = self.check_results() except ExtractorTestError, error: result = self.show_status('ERROR', error) + self.outbuffer.close() if self.directory: os.chdir(ROOT_DIR) subprocess.call(['chmod', '-R', '700', self.directory]) @@ -240,5 +228,3 @@ for outcome in OUTCOMES: for result in results: counts[result] += 1 print " Totals:", ', '.join(["%s %s" % (counts[key], key) for key in OUTCOMES]) -input_buffer.close() -output_buffer.close() |