# # main.py: a shared, automated test suite for Subversion # # Subversion is a tool for revision control. # See http://subversion.tigris.org for more information. # # ==================================================================== # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. ###################################################################### import sys import os import shutil import re import stat import subprocess import time import threading import optparse import xml import urllib import logging import hashlib import zipfile import codecs try: # Python >=3.0 import queue from urllib.parse import quote as urllib_parse_quote from urllib.parse import unquote as urllib_parse_unquote from urllib.parse import urlparse except ImportError: # Python <3.0 import Queue as queue from urllib import quote as urllib_parse_quote from urllib import unquote as urllib_parse_unquote from urlparse import urlparse import svntest from svntest import Failure from svntest import Skip SVN_VER_MINOR = 11 ###################################################################### # # HOW TO USE THIS MODULE: # # Write a new python script that # # 1) imports this 'svntest' package # # 2) contains a number of related 'test' routines. (Each test # routine should take no arguments, and return None on success # or throw a Failure exception on failure. Each test should # also contain a short docstring.) # # 3) places all the tests into a list that begins with None. # # 4) calls svntest.main.client_test() on the list. # # Also, your tests will probably want to use some of the common # routines in the 'Utilities' section below. # ##################################################################### # Global stuff default_num_threads = 5 # Don't try to use this before calling execute_tests() logger = None class SVNProcessTerminatedBySignal(Failure): "Exception raised if a spawned process segfaulted, aborted, etc." pass class SVNLineUnequal(Failure): "Exception raised if two lines are unequal" pass class SVNUnmatchedError(Failure): "Exception raised if an expected error is not found" pass class SVNCommitFailure(Failure): "Exception raised if a commit failed" pass class SVNRepositoryCopyFailure(Failure): "Exception raised if unable to copy a repository" pass class SVNRepositoryCreateFailure(Failure): "Exception raised if unable to create a repository" pass # Windows specifics if sys.platform == 'win32': windows = True file_scheme_prefix = 'file:///' _exe = '.exe' _bat = '.bat' os.environ['SVN_DBG_STACKTRACES_TO_STDERR'] = 'y' else: windows = False file_scheme_prefix = 'file://' _exe = '' _bat = '' # The location of our mock svneditor script. if windows: svneditor_script = os.path.join(sys.path[0], 'svneditor.bat') else: svneditor_script = os.path.join(sys.path[0], 'svneditor.py') # Username and password used by the working copies wc_author = 'jrandom' wc_passwd = 'rayjandom' # Username and password used by svnrdump in dump/load cross-checks crosscheck_username = '__dumpster__' crosscheck_password = '__loadster__' # Username and password used by the working copies for "second user" # scenarios wc_author2 = 'jconstant' # use the same password as wc_author stack_trace_regexp = r'(?:.*subversion[\\//].*\.c:[0-9]*,$|.*apr_err=.*)' # Set C locale for command line programs os.environ['LC_ALL'] = 'C' ###################################################################### # Permission constants used with e.g. chmod() and open(). # Define them here at a central location, so people aren't tempted to # use octal literals which are not portable between Python 2 and 3. S_ALL_READ = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH S_ALL_WRITE = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH S_ALL_EXEC = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH S_ALL_RW = S_ALL_READ | S_ALL_WRITE S_ALL_RX = S_ALL_READ | S_ALL_EXEC S_ALL_RWX = S_ALL_READ | S_ALL_WRITE | S_ALL_EXEC ###################################################################### # The locations of the svn binaries. # Use --bin to override these defaults. def P(relpath, head=os.path.dirname(os.path.dirname(os.path.abspath('.'))) ): if sys.platform=='win32': return os.path.join(head, relpath + '.exe') else: return os.path.join(head, relpath) svn_binary = P('svn/svn') svnadmin_binary = P('svnadmin/svnadmin') svnlook_binary = P('svnlook/svnlook') svnrdump_binary = P('svnrdump/svnrdump') svnsync_binary = P('svnsync/svnsync') svnversion_binary = P('svnversion/svnversion') svndumpfilter_binary = P('svndumpfilter/svndumpfilter') svnmucc_binary = P('svnmucc/svnmucc') svnfsfs_binary = P('svnfsfs/svnfsfs') entriesdump_binary = P('tests/cmdline/entries-dump') lock_helper_binary = P('tests/cmdline/lock-helper') atomic_ra_revprop_change_binary = P('tests/cmdline/atomic-ra-revprop-change') wc_lock_tester_binary = P('tests/libsvn_wc/wc-lock-tester') wc_incomplete_tester_binary = P('tests/libsvn_wc/wc-incomplete-tester') del P ###################################################################### # The location of svnauthz binary, relative to the only scripts that # import this file right now (they live in ../). # Use --tools to overide these defaults. svnauthz_binary = os.path.abspath('../../../tools/server-side/svnauthz' + _exe) svnauthz_validate_binary = os.path.abspath( '../../../tools/server-side/svnauthz-validate' + _exe ) svnmover_binary = os.path.abspath('../../../tools/dev/svnmover/svnmover' + _exe) # Location to the pristine repository, will be calculated from test_area_url # when we know what the user specified for --url. pristine_greek_repos_url = None # Global variable to track all of our options options = None # End of command-line-set global variables. ###################################################################### # All temporary repositories and working copies are created underneath # this dir, so there's one point at which to mount, e.g., a ramdisk. work_dir = "svn-test-work" # Constant for the merge info property. SVN_PROP_MERGEINFO = "svn:mergeinfo" # Constant for the inheritable auto-props property. SVN_PROP_INHERITABLE_AUTOPROPS = "svn:auto-props" # Constant for the inheritable ignores property. SVN_PROP_INHERITABLE_IGNORES = "svn:global-ignores" # Where we want all the repositories and working copies to live. # Each test will have its own! general_repo_dir = os.path.join(work_dir, "repositories") general_wc_dir = os.path.join(work_dir, "working_copies") # Directories used for DAV tests other_dav_root_dir = os.path.join(work_dir, "fsdavroot") non_dav_root_dir = os.path.join(work_dir, "nodavroot") # temp directory in which we will create our 'pristine' local # repository and other scratch data. This should be removed when we # quit and when we startup. temp_dir = os.path.join(work_dir, 'local_tmp') # (derivatives of the tmp dir.) pristine_greek_repos_dir = os.path.join(temp_dir, "repos") greek_dump_dir = os.path.join(temp_dir, "greekfiles") default_config_dir = os.path.abspath(os.path.join(temp_dir, "config")) # # Our pristine greek-tree state. # # If a test wishes to create an "expected" working-copy tree, it should # call main.greek_state.copy(). That method will return a copy of this # State object which can then be edited. # _item = svntest.wc.StateItem greek_state = svntest.wc.State('', { 'iota' : _item("This is the file 'iota'.\n"), 'A' : _item(), 'A/mu' : _item("This is the file 'mu'.\n"), 'A/B' : _item(), 'A/B/lambda' : _item("This is the file 'lambda'.\n"), 'A/B/E' : _item(), 'A/B/E/alpha' : _item("This is the file 'alpha'.\n"), 'A/B/E/beta' : _item("This is the file 'beta'.\n"), 'A/B/F' : _item(), 'A/C' : _item(), 'A/D' : _item(), 'A/D/gamma' : _item("This is the file 'gamma'.\n"), 'A/D/G' : _item(), 'A/D/G/pi' : _item("This is the file 'pi'.\n"), 'A/D/G/rho' : _item("This is the file 'rho'.\n"), 'A/D/G/tau' : _item("This is the file 'tau'.\n"), 'A/D/H' : _item(), 'A/D/H/chi' : _item("This is the file 'chi'.\n"), 'A/D/H/psi' : _item("This is the file 'psi'.\n"), 'A/D/H/omega' : _item("This is the file 'omega'.\n"), }) ###################################################################### # Utilities shared by the tests def wrap_ex(func, output): "Wrap a function, catch, print and ignore exceptions" def w(*args, **kwds): try: return func(*args, **kwds) except Failure as ex: if ex.__class__ != Failure or ex.args: ex_args = str(ex) if ex_args: logger.warn('EXCEPTION: %s: %s', ex.__class__.__name__, ex_args) else: logger.warn('EXCEPTION: %s', ex.__class__.__name__) return w def setup_development_mode(): "Wraps functions in module actions" l = [ 'run_and_verify_svn', 'run_and_verify_svnversion', 'run_and_verify_load', 'run_and_verify_dump', 'run_and_verify_checkout', 'run_and_verify_export', 'run_and_verify_update', 'run_and_verify_merge', 'run_and_verify_switch', 'run_and_verify_commit', 'run_and_verify_unquiet_status', 'run_and_verify_status', 'run_and_verify_diff_summarize', 'run_and_verify_diff_summarize_xml', 'run_and_validate_lock'] for func in l: setattr(svntest.actions, func, wrap_ex(getattr(svntest.actions, func))) def get_admin_name(): "Return name of SVN administrative subdirectory." if (windows or sys.platform == 'cygwin') \ and 'SVN_ASP_DOT_NET_HACK' in os.environ: return '_svn' else: return '.svn' def wc_is_singledb(wcpath): """Temporary function that checks whether a working copy directory looks like it is part of a single-db working copy.""" pristine = os.path.join(wcpath, get_admin_name(), 'pristine') if not os.path.exists(pristine): return True # Now we must be looking at a multi-db WC dir or the root dir of a # single-DB WC. Sharded 'pristine' dir => single-db, else => multi-db. for name in os.listdir(pristine): if len(name) == 2: return True elif len(name) == 40: return False return False def get_start_commit_hook_path(repo_dir): "Return the path of the start-commit-hook conf file in REPO_DIR." return os.path.join(repo_dir, "hooks", "start-commit") def get_pre_commit_hook_path(repo_dir): "Return the path of the pre-commit-hook conf file in REPO_DIR." return os.path.join(repo_dir, "hooks", "pre-commit") def get_post_commit_hook_path(repo_dir): "Return the path of the post-commit-hook conf file in REPO_DIR." return os.path.join(repo_dir, "hooks", "post-commit") def get_pre_revprop_change_hook_path(repo_dir): "Return the path of the pre-revprop-change hook script in REPO_DIR." return os.path.join(repo_dir, "hooks", "pre-revprop-change") def get_pre_lock_hook_path(repo_dir): "Return the path of the pre-lock hook script in REPO_DIR." return os.path.join(repo_dir, "hooks", "pre-lock") def get_pre_unlock_hook_path(repo_dir): "Return the path of the pre-unlock hook script in REPO_DIR." return os.path.join(repo_dir, "hooks", "pre-unlock") def get_svnserve_conf_file_path(repo_dir): "Return the path of the svnserve.conf file in REPO_DIR." return os.path.join(repo_dir, "conf", "svnserve.conf") def get_fsfs_conf_file_path(repo_dir): "Return the path of the fsfs.conf file in REPO_DIR." return os.path.join(repo_dir, "db", "fsfs.conf") def get_fsfs_format_file_path(repo_dir): "Return the path of the format file in REPO_DIR." return os.path.join(repo_dir, "db", "format") def ensure_list(item): "If ITEM is not already a list, convert it to a list." if isinstance(item, list): return item elif isinstance(item, bytes) or isinstance(item, str): return [ item ] else: return list(item) def filter_dbg(lines, binary = False): if binary: excluded = filter(lambda line: line.startswith(b'DBG:'), lines) excluded = map(bytes.decode, excluded) included = filter(lambda line: not line.startswith(b'DBG:'), lines) else: excluded = filter(lambda line: line.startswith('DBG:'), lines) included = filter(lambda line: not line.startswith('DBG:'), lines) sys.stdout.write(''.join(excluded)) return ensure_list(included) # Run any binary, logging the command line and return code def run_command(command, error_expected, binary_mode=False, *varargs): """Run COMMAND with VARARGS. Return exit code as int; stdout, stderr as lists of lines (including line terminators). See run_command_stdin() for details. If ERROR_EXPECTED is None, any stderr output will be printed and any stderr output or a non-zero exit code will raise an exception.""" return run_command_stdin(command, error_expected, 0, binary_mode, None, *varargs) # Frequently used constants: # If any of these relative path strings show up in a server response, # then we can assume that the on-disk repository path was leaked to the # client. Having these here as constants means we don't need to construct # them over and over again. _repos_diskpath1 = os.path.join('cmdline', 'svn-test-work', 'repositories') _repos_diskpath2 = os.path.join('cmdline', 'svn-test-work', 'local_tmp', 'repos') _repos_diskpath1_bytes = _repos_diskpath1.encode() _repos_diskpath2_bytes = _repos_diskpath2.encode() # A regular expression that matches arguments that are trivially safe # to pass on a command line without quoting on any supported operating # system: _safe_arg_re = re.compile(r'^[A-Za-z\d\.\_\/\-\:\@]+$') def _quote_arg(arg): """Quote ARG for a command line. Return a quoted version of the string ARG, or just ARG if it contains only universally harmless characters. WARNING: This function cannot handle arbitrary command-line arguments: it is just good enough for what we need here.""" arg = str(arg) if _safe_arg_re.match(arg): return arg if windows: # Note: subprocess.list2cmdline is Windows-specific. return subprocess.list2cmdline([arg]) else: # Quoting suitable for most Unix shells. return "'" + arg.replace("'", "'\\''") + "'" def open_pipe(command, bufsize=-1, stdin=None, stdout=None, stderr=None): """Opens a subprocess.Popen pipe to COMMAND using STDIN, STDOUT, and STDERR. BUFSIZE is passed to subprocess.Popen's argument of the same name. Returns (infile, outfile, errfile, waiter); waiter should be passed to wait_on_pipe.""" command = [str(x) for x in command] # Always run python scripts under the same Python executable as used # for the test suite. if command[0].endswith('.py'): command.insert(0, sys.executable) command_string = command[0] + ' ' + ' '.join(map(_quote_arg, command[1:])) if not stdin: stdin = subprocess.PIPE if not stdout: stdout = subprocess.PIPE if not stderr: stderr = subprocess.PIPE p = subprocess.Popen(command, bufsize, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=not windows) return p.stdin, p.stdout, p.stderr, (p, command_string) def wait_on_pipe(waiter, binary_mode, stdin=None): """WAITER is (KID, COMMAND_STRING). Wait for KID (opened with open_pipe) to finish, dying if it does. If KID fails, create an error message containing any stdout and stderr from the kid. Show COMMAND_STRING in diagnostic messages. Normalize Windows line endings of stdout and stderr if not BINARY_MODE. Return KID's exit code as int; stdout, stderr as lists of lines (including line terminators).""" if waiter is None: return kid, command_string = waiter stdout, stderr = kid.communicate(stdin) exit_code = kid.returncode # We always expect STDERR to be strings, not byte-arrays. if not isinstance(stderr, str): stderr = stderr.decode("utf-8") if not binary_mode: if not isinstance(stdout, str): stdout = stdout.decode("utf-8") # Normalize Windows line endings if in text mode. if windows: stdout = stdout.replace('\r\n', '\n') stderr = stderr.replace('\r\n', '\n') # Convert output strings to lists. stdout_lines = stdout.splitlines(True) stderr_lines = stderr.splitlines(True) if exit_code < 0: if not windows: exit_signal = os.WTERMSIG(-exit_code) else: exit_signal = exit_code if stdout_lines is not None: logger.info("".join(stdout_lines)) if stderr_lines is not None: logger.warning("".join(stderr_lines)) # show the whole path to make it easier to start a debugger logger.warning("CMD: %s terminated by signal %d" % (command_string, exit_signal)) raise SVNProcessTerminatedBySignal else: if exit_code: logger.info("CMD: %s exited with %d" % (command_string, exit_code)) return stdout_lines, stderr_lines, exit_code def spawn_process(command, bufsize=-1, binary_mode=False, stdin_lines=None, *varargs): """Run any binary, supplying input text, logging the command line. BUFSIZE dictates the pipe buffer size used in communication with the subprocess: quoting from subprocess.Popen(), "0 means unbuffered, 1 means line buffered, any other positive value means use a buffer of (approximately) that size. A negative bufsize means to use the system default, which usually means fully buffered." Normalize Windows line endings of stdout and stderr if not BINARY_MODE. Return exit code as int; stdout, stderr as lists of lines (including line terminators). """ if stdin_lines and not isinstance(stdin_lines, list): raise TypeError("stdin_lines should have list type") # Log the command line if not command.endswith('.py'): logger.info('CMD: %s %s' % (os.path.basename(command), ' '.join([_quote_arg(x) for x in varargs]))) infile, outfile, errfile, kid = open_pipe([command] + list(varargs), bufsize) if stdin_lines: for x in stdin_lines: infile.write(x) stdout_lines, stderr_lines, exit_code = wait_on_pipe(kid, binary_mode) infile.close() outfile.close() errfile.close() return exit_code, stdout_lines, stderr_lines def run_command_stdin(command, error_expected, bufsize=-1, binary_mode=False, stdin_lines=None, *varargs): """Run COMMAND with VARARGS; input STDIN_LINES (a list of strings which should include newline characters) to program via stdin - this should not be very large, as if the program outputs more than the OS is willing to buffer, this will deadlock, with both Python and COMMAND waiting to write to each other for ever. For tests where this is a problem, setting BUFSIZE to a sufficiently large value will prevent the deadlock, see spawn_process(). Normalize Windows line endings of stdout and stderr if not BINARY_MODE. Return exit code as int; stdout, stderr as lists of lines (including line terminators). If ERROR_EXPECTED is None, any stderr output will be printed and any stderr output or a non-zero exit code will raise an exception.""" start = time.time() exit_code, stdout_lines, stderr_lines = spawn_process(command, bufsize, binary_mode, stdin_lines, *varargs) def _line_contains_repos_diskpath(line): # ### Note: this assumes that either svn-test-work isn't a symlink, # ### or the diskpath isn't realpath()'d somewhere on the way from # ### the server's configuration and the client's stderr. We could # ### check for both the symlinked path and the realpath. if isinstance(line, str): return _repos_diskpath1 in line or _repos_diskpath2 in line else: return _repos_diskpath1_bytes in line or _repos_diskpath2_bytes in line for lines, name in [[stdout_lines, "stdout"], [stderr_lines, "stderr"]]: if is_ra_type_file() or 'svnadmin' in command or 'svnlook' in command: break # Does the server leak the repository on-disk path? # (prop_tests-12 installs a hook script that does that intentionally) if any(map(_line_contains_repos_diskpath, lines)) \ and not any(map(lambda arg: 'prop_tests-12' in arg, varargs)): raise Failure("Repository diskpath in %s: %r" % (name, lines)) valgrind_diagnostic = False # A valgrind diagnostic will raise a failure if the command is # expected to run without error. When an error is expected any # subsequent error pattern matching is usually lenient and will not # detect the diagnostic so make sure a failure is raised here. if error_expected and stderr_lines: if any(map(lambda arg: re.match('==[0-9]+==', arg), stderr_lines)): valgrind_diagnostic = True stop = time.time() logger.info('