summaryrefslogtreecommitdiff
path: root/scripts/dtrx
diff options
context:
space:
mode:
authorDmitry Bogatov <KAction@debian.org>2018-12-02 05:36:55 +0000
committerDmitry Bogatov <KAction@debian.org>2018-12-02 05:36:55 +0000
commitd621b3047b2b9dd96c952b8e0c420368796672eb (patch)
tree338b6702f16dce8e1c6a31df14dcb5cb77bc3404 /scripts/dtrx
Import Upstream version 6.4
Diffstat (limited to 'scripts/dtrx')
-rwxr-xr-xscripts/dtrx1210
1 files changed, 1210 insertions, 0 deletions
diff --git a/scripts/dtrx b/scripts/dtrx
new file mode 100755
index 0000000..70e7965
--- /dev/null
+++ b/scripts/dtrx
@@ -0,0 +1,1210 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# dtrx -- Intelligently extract various archive types.
+# Copyright ⓒ 2006, 2007, 2008 Brett Smith <brettcsmith@brettcsmith.org>
+# Copyright ⓒ 2008 Peter Kelemen <Peter.Kelemen@gmail.com>
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 3 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, see <http://www.gnu.org/licenses/>.
+
+# Python 2.3 string methods: 'rfind', 'rindex', 'rjust', 'rstrip'
+
+import errno
+import logging
+import mimetypes
+import optparse
+import os
+import re
+import shutil
+import signal
+import stat
+import subprocess
+import sys
+import tempfile
+import textwrap
+import traceback
+
+try:
+ set
+except NameError:
+ from sets import Set as set
+
+VERSION = "6.4"
+VERSION_BANNER = """dtrx version %s
+Copyright ⓒ 2006, 2007, 2008 Brett Smith <brettcsmith@brettcsmith.org>
+Copyright ⓒ 2008 Peter Kelemen <Peter.Kelemen@gmail.com>
+
+This program is free software; you can redistribute it and/or modify it
+under the terms of the GNU General Public License as published by the
+Free Software Foundation; either version 3 of the License, or (at your
+option) any later version.
+
+This program is distributed in the hope that it will be useful, but
+WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
+Public License for more details.""" % (VERSION,)
+
+MATCHING_DIRECTORY = 1
+ONE_ENTRY_KNOWN = 2
+BOMB = 3
+EMPTY = 4
+ONE_ENTRY_FILE = 'file'
+ONE_ENTRY_DIRECTORY = 'directory'
+
+ONE_ENTRY_UNKNOWN = [ONE_ENTRY_FILE, ONE_ENTRY_DIRECTORY]
+
+EXTRACT_HERE = 1
+EXTRACT_WRAP = 2
+EXTRACT_RENAME = 3
+
+RECURSE_ALWAYS = 1
+RECURSE_ONCE = 2
+RECURSE_NOT_NOW = 3
+RECURSE_NEVER = 4
+RECURSE_LIST = 5
+
+mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
+mimetypes.encodings_map.setdefault('.lzma', 'lzma')
+mimetypes.types_map.setdefault('.gem', 'application/x-ruby-gem')
+
+logger = logging.getLogger('dtrx-log')
+
+class FilenameChecker(object):
+ free_func = os.open
+ free_args = (os.O_CREAT | os.O_EXCL,)
+ free_close = os.close
+
+ def __init__(self, original_name):
+ self.original_name = original_name
+
+ def is_free(self, filename):
+ try:
+ result = self.free_func(filename, *self.free_args)
+ except OSError, error:
+ if error.errno == errno.EEXIST:
+ return False
+ raise
+ if self.free_close:
+ self.free_close(result)
+ return True
+
+ def create(self):
+ fd, filename = tempfile.mkstemp(prefix=self.original_name + '.',
+ dir='.')
+ os.close(fd)
+ return filename
+
+ def check(self):
+ for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
+ filename = '%s%s' % (self.original_name, suffix)
+ if self.is_free(filename):
+ return filename
+ return self.create()
+
+
+class DirectoryChecker(FilenameChecker):
+ free_func = os.mkdir
+ free_args = ()
+ free_close = None
+
+ def create(self):
+ return tempfile.mkdtemp(prefix=self.original_name + '.', dir='.')
+
+
+class ExtractorError(Exception):
+ pass
+
+
+class ExtractorUnusable(Exception):
+ pass
+
+
+EXTRACTION_ERRORS = (ExtractorError, ExtractorUnusable, OSError, IOError)
+
+class BaseExtractor(object):
+ decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat',
+ 'lzma': 'lzcat'}
+ name_checker = DirectoryChecker
+
+ def __init__(self, filename, encoding):
+ if encoding and (not self.decoders.has_key(encoding)):
+ raise ValueError("unrecognized encoding %s" % (encoding,))
+ self.filename = os.path.realpath(filename)
+ self.encoding = encoding
+ self.file_count = 0
+ self.included_archives = []
+ self.target = None
+ self.content_type = None
+ self.content_name = None
+ self.pipes = []
+ self.stderr = tempfile.TemporaryFile()
+ self.exit_codes = []
+ try:
+ self.archive = open(filename, 'r')
+ except (IOError, OSError), error:
+ raise ExtractorError("could not open %s: %s" %
+ (filename, error.strerror))
+ if encoding:
+ self.pipe([self.decoders[encoding]], "decoding")
+ self.prepare()
+
+ def pipe(self, command, description="extraction"):
+ self.pipes.append((command, description))
+
+ def first_bad_exit_code(self):
+ for index, code in enumerate(self.exit_codes):
+ if code != 0:
+ return index
+ return None
+
+ def run_pipes(self, final_stdout=None):
+ if not self.pipes:
+ return
+ elif final_stdout is None:
+ # FIXME: Buffering this might be dumb.
+ final_stdout = tempfile.TemporaryFile()
+ num_pipes = len(self.pipes)
+ last_pipe = num_pipes - 1
+ processes = []
+ for index, command in enumerate([pipe[0] for pipe in self.pipes]):
+ if index == 0:
+ stdin = self.archive
+ else:
+ stdin = processes[-1].stdout
+ if index == last_pipe:
+ stdout = final_stdout
+ else:
+ stdout = subprocess.PIPE
+ try:
+ processes.append(subprocess.Popen(command, stdin=stdin,
+ stdout=stdout,
+ stderr=self.stderr))
+ except OSError, error:
+ if error.errno == errno.ENOENT:
+ raise ExtractorUnusable("could not run %s" % (command[0],))
+ raise
+ self.exit_codes = [pipe.wait() for pipe in processes]
+ self.archive.close()
+ for index in range(last_pipe):
+ processes[index].stdout.close()
+ self.archive = final_stdout
+
+ def prepare(self):
+ pass
+
+ def check_included_archives(self):
+ if (self.content_name is None) or (not self.content_name.endswith('/')):
+ self.included_root = './'
+ else:
+ self.included_root = self.content_name
+ start_index = len(self.included_root)
+ for path, dirname, filenames in os.walk(self.included_root):
+ self.file_count += len(filenames)
+ path = path[start_index:]
+ for filename in filenames:
+ if (ExtractorBuilder.try_by_mimetype(filename) or
+ ExtractorBuilder.try_by_extension(filename)):
+ self.included_archives.append(os.path.join(path, filename))
+
+ def check_contents(self):
+ if not self.contents:
+ self.content_type = EMPTY
+ elif len(self.contents) == 1:
+ if self.basename() == self.contents[0]:
+ self.content_type = MATCHING_DIRECTORY
+ elif os.path.isdir(self.contents[0]):
+ self.content_type = ONE_ENTRY_DIRECTORY
+ else:
+ self.content_type = ONE_ENTRY_FILE
+ self.content_name = self.contents[0]
+ if os.path.isdir(self.contents[0]):
+ self.content_name += '/'
+ else:
+ self.content_type = BOMB
+ self.check_included_archives()
+
+ def basename(self):
+ pieces = os.path.basename(self.filename).split('.')
+ extension = '.' + pieces[-1]
+ if mimetypes.encodings_map.has_key(extension):
+ pieces.pop()
+ extension = '.' + pieces[-1]
+ if (mimetypes.types_map.has_key(extension) or
+ mimetypes.common_types.has_key(extension) or
+ mimetypes.suffix_map.has_key(extension)):
+ pieces.pop()
+ return '.'.join(pieces)
+
+ def get_stderr(self):
+ self.stderr.seek(0, 0)
+ errors = self.stderr.read(-1)
+ self.stderr.close()
+ return errors
+
+ def check_success(self, got_output):
+ error_index = self.first_bad_exit_code()
+ if (not got_output) and (error_index is not None):
+ command = ' '.join(self.pipes[error_index][0])
+ raise ExtractorError("%s error: '%s' returned status code %s" %
+ (self.pipes[error_index][1], command,
+ self.exit_codes[error_index]))
+
+ def extract_archive(self):
+ self.pipe(self.extract_pipe)
+ self.run_pipes()
+
+ def extract(self):
+ try:
+ self.target = tempfile.mkdtemp(prefix='.dtrx-', dir='.')
+ except (OSError, IOError), error:
+ raise ExtractorError("cannot extract here: %s" % (error.strerror,))
+ old_path = os.path.realpath(os.curdir)
+ os.chdir(self.target)
+ try:
+ self.archive.seek(0, 0)
+ self.extract_archive()
+ self.contents = os.listdir('.')
+ self.check_contents()
+ self.check_success(self.content_type != EMPTY)
+ except EXTRACTION_ERRORS:
+ self.archive.close()
+ os.chdir(old_path)
+ shutil.rmtree(self.target, ignore_errors=True)
+ raise
+ self.archive.close()
+ os.chdir(old_path)
+
+ def get_filenames(self):
+ self.pipe(self.list_pipe, "listing")
+ self.run_pipes()
+ self.check_success(False)
+ self.archive.seek(0, 0)
+ while True:
+ line = self.archive.readline()
+ if not line:
+ self.archive.close()
+ return
+ yield line.rstrip('\n')
+
+
+class CompressionExtractor(BaseExtractor):
+ file_type = 'compressed file'
+ name_checker = FilenameChecker
+
+ def basename(self):
+ pieces = os.path.basename(self.filename).split('.')
+ extension = '.' + pieces[-1]
+ if mimetypes.encodings_map.has_key(extension):
+ pieces.pop()
+ return '.'.join(pieces)
+
+ def get_filenames(self):
+ # This code used to just immediately yield the basename, under the
+ # assumption that that would be the filename. However, if that
+ # happens, dtrx -l will report this as a valid result for files with
+ # compression extensions, even if those files shouldn't actually be
+ # handled this way. So, we call out to the file command to do a quick
+ # check and make sure this actually looks like a compressed file.
+ if 'compress' not in [match[0] for match in
+ ExtractorBuilder.try_by_magic(self.filename)]:
+ raise ExtractorError("doesn't look like a compressed file")
+ yield self.basename()
+
+ def extract(self):
+ self.content_type = ONE_ENTRY_KNOWN
+ self.content_name = self.basename()
+ self.contents = None
+ self.included_root = './'
+ try:
+ output_fd, self.target = tempfile.mkstemp(prefix='.dtrx-', dir='.')
+ except (OSError, IOError), error:
+ raise ExtractorError("cannot extract here: %s" % (error.strerror,))
+ self.run_pipes(output_fd)
+ os.close(output_fd)
+ try:
+ self.check_success(os.stat(self.target)[stat.ST_SIZE] > 0)
+ except EXTRACTION_ERRORS:
+ os.unlink(self.target)
+ raise
+
+class TarExtractor(BaseExtractor):
+ file_type = 'tar file'
+ extract_pipe = ['tar', '-x']
+ list_pipe = ['tar', '-t']
+
+
+class CpioExtractor(BaseExtractor):
+ file_type = 'cpio file'
+ extract_pipe = ['cpio', '-i', '--make-directories', '--quiet',
+ '--no-absolute-filenames']
+ list_pipe = ['cpio', '-t', '--quiet']
+
+
+class RPMExtractor(CpioExtractor):
+ file_type = 'RPM'
+
+ def prepare(self):
+ self.pipe(['rpm2cpio', '-'], "rpm2cpio")
+
+ def basename(self):
+ pieces = os.path.basename(self.filename).split('.')
+ if len(pieces) == 1:
+ return pieces[0]
+ elif pieces[-1] != 'rpm':
+ return BaseExtractor.basename(self)
+ pieces.pop()
+ if len(pieces) == 1:
+ return pieces[0]
+ elif len(pieces[-1]) < 8:
+ pieces.pop()
+ return '.'.join(pieces)
+
+ def check_contents(self):
+ self.check_included_archives()
+ self.content_type = BOMB
+
+
+class DebExtractor(TarExtractor):
+ file_type = 'Debian package'
+
+ def prepare(self):
+ self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
+ "data.tar.gz extraction")
+ self.pipe(['zcat'], "data.tar.gz decompression")
+
+ def basename(self):
+ pieces = os.path.basename(self.filename).split('_')
+ if len(pieces) == 1:
+ return pieces[0]
+ last_piece = pieces.pop()
+ if (len(last_piece) > 10) or (not last_piece.endswith('.deb')):
+ return BaseExtractor.basename(self)
+ return '_'.join(pieces)
+
+ def check_contents(self):
+ self.check_included_archives()
+ self.content_type = BOMB
+
+
+class DebMetadataExtractor(DebExtractor):
+ def prepare(self):
+ self.pipe(['ar', 'p', self.filename, 'control.tar.gz'],
+ "control.tar.gz extraction")
+ self.pipe(['zcat'], "control.tar.gz decompression")
+
+
+class GemExtractor(TarExtractor):
+ file_type = 'Ruby gem'
+
+ def prepare(self):
+ self.pipe(['tar', '-xO', 'data.tar.gz'], "data.tar.gz extraction")
+ self.pipe(['zcat'], "data.tar.gz decompression")
+
+ def check_contents(self):
+ self.check_included_archives()
+ self.content_type = BOMB
+
+
+class GemMetadataExtractor(CompressionExtractor):
+ file_type = 'Ruby gem'
+
+ def prepare(self):
+ self.pipe(['tar', '-xO', 'metadata.gz'], "metadata.gz extraction")
+ self.pipe(['zcat'], "metadata.gz decompression")
+
+ def basename(self):
+ return os.path.basename(self.filename) + '-metadata.txt'
+
+
+class NoPipeExtractor(BaseExtractor):
+ # Some extraction tools won't accept the archive from stdin. With
+ # these, the piping infrastructure we normally set up generally doesn't
+ # work, at least at first. We can still use most of it; we just don't
+ # want to seed self.archive with the archive file, since that sucks up
+ # memory. So instead we seed it with /dev/null, and specify the
+ # filename on the command line as necessary. We also open the actual
+ # file with os.open, to make sure we can actually do it (permissions
+ # are good, etc.). This class doesn't do anything by itself; it's just
+ # meant to be a base class for extractors that rely on these dumb
+ # tools.
+ def __init__(self, filename, encoding):
+ os.close(os.open(filename, os.O_RDONLY))
+ BaseExtractor.__init__(self, '/dev/null', None)
+ self.filename = os.path.realpath(filename)
+
+ def extract_archive(self):
+ self.extract_pipe = self.extract_command + [self.filename]
+ BaseExtractor.extract_archive(self)
+
+ def get_filenames(self):
+ self.list_pipe = self.list_command + [self.filename]
+ return BaseExtractor.get_filenames(self)
+
+
+class ZipExtractor(NoPipeExtractor):
+ file_type = 'Zip file'
+ extract_command = ['unzip', '-q']
+ list_command = ['zipinfo', '-1']
+
+
+class SevenExtractor(NoPipeExtractor):
+ file_type = '7z file'
+ extract_command = ['7z', 'x']
+ list_command = ['7z', 'l']
+ border_re = re.compile('^[- ]+$')
+
+ def get_filenames(self):
+ fn_index = None
+ for line in NoPipeExtractor.get_filenames(self):
+ if self.border_re.match(line):
+ if fn_index is not None:
+ break
+ else:
+ fn_index = line.rindex(' ') + 1
+ elif fn_index is not None:
+ yield line[fn_index:]
+ self.archive.close()
+
+
+class CABExtractor(NoPipeExtractor):
+ file_type = 'CAB archive'
+ extract_command = ['cabextract', '-q']
+ list_command = ['cabextract', '-l']
+ border_re = re.compile(r'^[-\+]+$')
+
+ def get_filenames(self):
+ fn_index = None
+ filenames = NoPipeExtractor.get_filenames(self)
+ for line in filenames:
+ if self.border_re.match(line):
+ break
+ for line in filenames:
+ try:
+ yield line.split(' | ', 2)[2]
+ except IndexError:
+ break
+ self.archive.close()
+
+
+class ShieldExtractor(NoPipeExtractor):
+ file_type = 'InstallShield archive'
+ extract_command = ['unshield', 'x']
+ list_command = ['unshield', 'l']
+ prefix_re = re.compile(r'^\s+\d+\s+')
+ end_re = re.compile(r'^\s+-+\s+-+\s*$')
+
+ def get_filenames(self):
+ for line in NoPipeExtractor.get_filenames(self):
+ if self.end_re.match(line):
+ break
+ else:
+ match = self.prefix_re.match(line)
+ if match:
+ yield line[match.end():]
+ self.archive.close()
+
+ def basename(self):
+ result = NoPipeExtractor.basename(self)
+ if result.endswith('.hdr'):
+ result = result[:-4]
+ return result
+
+
+class RarExtractor(NoPipeExtractor):
+ file_type = 'RAR archive'
+ extract_command = ['unrar', 'x']
+ list_command = ['unrar', 'l']
+ border_re = re.compile('^-+$')
+
+ def get_filenames(self):
+ inside = False
+ for line in NoPipeExtractor.get_filenames(self):
+ if self.border_re.match(line):
+ if inside:
+ break
+ else:
+ inside = True
+ elif inside:
+ yield line.split(' ')[1]
+ self.archive.close()
+
+
+class BaseHandler(object):
+ def __init__(self, extractor, options):
+ self.extractor = extractor
+ self.options = options
+ self.target = None
+
+ def handle(self):
+ command = 'find'
+ status = subprocess.call(['find', self.extractor.target, '-type', 'd',
+ '-exec', 'chmod', 'u+rwx', '{}', ';'])
+ if status == 0:
+ command = 'chmod'
+ status = subprocess.call(['chmod', '-R', 'u+rwX',
+ self.extractor.target])
+ if status != 0:
+ return "%s returned with exit status %s" % (command, status)
+ return self.organize()
+
+ def set_target(self, target, checker):
+ self.target = checker(target).check()
+ if self.target != target:
+ logger.warning("extracting %s to %s" %
+ (self.extractor.filename, self.target))
+
+
+# The "where to extract" table, with options and archive types.
+# This dictates the contents of each can_handle method.
+#
+# Flat Overwrite None
+# File basename basename FilenameChecked
+# Match . . tempdir + checked
+# Bomb . basename DirectoryChecked
+
+class FlatHandler(BaseHandler):
+ def can_handle(contents, options):
+ return ((options.flat and (contents != ONE_ENTRY_KNOWN)) or
+ (options.overwrite and (contents == MATCHING_DIRECTORY)))
+ can_handle = staticmethod(can_handle)
+
+ def organize(self):
+ self.target = '.'
+ for curdir, dirs, filenames in os.walk(self.extractor.target,
+ topdown=False):
+ path_parts = curdir.split(os.sep)
+ if path_parts[0] == '.':
+ del path_parts[1]
+ else:
+ del path_parts[0]
+ newdir = os.path.join(*path_parts)
+ if not os.path.isdir(newdir):
+ os.makedirs(newdir)
+ for filename in filenames:
+ os.rename(os.path.join(curdir, filename),
+ os.path.join(newdir, filename))
+ os.rmdir(curdir)
+
+
+class OverwriteHandler(BaseHandler):
+ def can_handle(contents, options):
+ return ((options.flat and (contents == ONE_ENTRY_KNOWN)) or
+ (options.overwrite and (contents != MATCHING_DIRECTORY)))
+ can_handle = staticmethod(can_handle)
+
+ def organize(self):
+ self.target = self.extractor.basename()
+ if os.path.isdir(self.target):
+ shutil.rmtree(self.target)
+ os.rename(self.extractor.target, self.target)
+
+
+class MatchHandler(BaseHandler):
+ def can_handle(contents, options):
+ return ((contents == MATCHING_DIRECTORY) or
+ ((contents in ONE_ENTRY_UNKNOWN) and
+ options.one_entry_policy.ok_for_match()))
+ can_handle = staticmethod(can_handle)
+
+ def organize(self):
+ source = os.path.join(self.extractor.target,
+ os.listdir(self.extractor.target)[0])
+ if os.path.isdir(source):
+ checker = DirectoryChecker
+ else:
+ checker = FilenameChecker
+ if self.options.one_entry_policy == EXTRACT_HERE:
+ destination = self.extractor.content_name.rstrip('/')
+ else:
+ destination = self.extractor.basename()
+ self.set_target(destination, checker)
+ if os.path.isdir(self.extractor.target):
+ os.rename(source, self.target)
+ os.rmdir(self.extractor.target)
+ else:
+ os.rename(self.extractor.target, self.target)
+ self.extractor.included_root = './'
+
+
+class EmptyHandler(object):
+ target = ''
+
+ def can_handle(contents, options):
+ return contents == EMPTY
+ can_handle = staticmethod(can_handle)
+
+ def __init__(self, extractor, options): pass
+ def handle(self): pass
+
+
+class BombHandler(BaseHandler):
+ def can_handle(contents, options):
+ return True
+ can_handle = staticmethod(can_handle)
+
+ def organize(self):
+ basename = self.extractor.basename()
+ self.set_target(basename, self.extractor.name_checker)
+ os.rename(self.extractor.target, self.target)
+
+
+class BasePolicy(object):
+ try:
+ width = int(os.environ['COLUMNS'])
+ except (KeyError, ValueError):
+ width = 80
+ wrapper = textwrap.TextWrapper(width=width - 1)
+
+ def __init__(self, options):
+ self.current_policy = None
+ if options.batch:
+ self.permanent_policy = self.answers['']
+ else:
+ self.permanent_policy = None
+
+ def wrap(self, question, filename):
+ # Note: This function assumes the filename is the first thing in the
+ # question text, and that's the only place it appears.
+ if len(self.wrapper.wrap(filename + ' a')) > 1:
+ return [filename] + self.wrapper.wrap(question[3:])
+ return self.wrapper.wrap(question % (filename,))
+
+ def ask_question(self, question):
+ question = question + self.choices
+ while True:
+ print "\n".join(question)
+ try:
+ answer = raw_input(self.prompt)
+ except EOFError:
+ return self.answers['']
+ try:
+ return self.answers[answer.lower()]
+ except KeyError:
+ print
+
+ def __cmp__(self, other):
+ return cmp(self.current_policy, other)
+
+
+class OneEntryPolicy(BasePolicy):
+ answers = {'h': EXTRACT_HERE, 'i': EXTRACT_WRAP, 'r': EXTRACT_RENAME,
+ '': EXTRACT_WRAP}
+ choices = ["You can:",
+ " * extract it Inside another directory",
+ " * extract it and Rename the directory",
+ " * extract it Here"]
+ prompt = "What do you want to do? (I/r/h) "
+
+ def __init__(self, options):
+ BasePolicy.__init__(self, options)
+ if options.flat:
+ default = 'h'
+ elif options.one_entry_default is not None:
+ default = options.one_entry_default.lower()
+ else:
+ return
+ if 'here'.startswith(default):
+ self.permanent_policy = EXTRACT_HERE
+ elif 'rename'.startswith(default):
+ self.permanent_policy = EXTRACT_RENAME
+ elif 'inside'.startswith(default):
+ self.permanent_policy = EXTRACT_WRAP
+ elif default is not None:
+ raise ValueError("bad value %s for default policy" % (default,))
+
+ def prep(self, archive_filename, extractor):
+ question = self.wrap(("%%s contains one %s, but its name " +
+ "doesn't match.") %
+ (extractor.content_type,), archive_filename)
+ question.append(" Expected: " + extractor.basename())
+ question.append(" Actual: " + extractor.content_name)
+ self.current_policy = (self.permanent_policy or
+ self.ask_question(question))
+
+ def ok_for_match(self):
+ return self.current_policy in (EXTRACT_RENAME, EXTRACT_HERE)
+
+
+class RecursionPolicy(BasePolicy):
+ answers = {'o': RECURSE_ONCE, 'a': RECURSE_ALWAYS, 'n': RECURSE_NOT_NOW,
+ 'v': RECURSE_NEVER, 'l': RECURSE_LIST, '': RECURSE_NOT_NOW}
+ choices = ["You can:",
+ " * Always extract included archives",
+ " * extract included archives this Once",
+ " * choose Not to extract included archives",
+ " * neVer extract included archives",
+ " * List included archives"]
+ prompt = "What do you want to do? (a/o/N/v/l) "
+
+ def __init__(self, options):
+ BasePolicy.__init__(self, options)
+ if options.show_list:
+ self.permanent_policy = RECURSE_NEVER
+ elif options.recursive:
+ self.permanent_policy = RECURSE_ALWAYS
+
+ def prep(self, current_filename, target, extractor):
+ archive_count = len(extractor.included_archives)
+ if (self.permanent_policy is not None) or (archive_count == 0):
+ self.current_policy = self.permanent_policy or RECURSE_NOT_NOW
+ return
+ question = self.wrap(("%%s contains %s other archive file(s), " +
+ "out of %s file(s) total.") %
+ (archive_count, extractor.file_count),
+ current_filename)
+ if target == '.':
+ target = ''
+ included_root = extractor.included_root
+ if included_root == './':
+ included_root = ''
+ while True:
+ self.current_policy = self.ask_question(question)
+ if self.current_policy != RECURSE_LIST:
+ break
+ print ("\n%s\n" %
+ '\n'.join([os.path.join(target, included_root, filename)
+ for filename in extractor.included_archives]))
+ if self.current_policy in (RECURSE_ALWAYS, RECURSE_NEVER):
+ self.permanent_policy = self.current_policy
+
+ def ok_to_recurse(self):
+ return self.current_policy in (RECURSE_ALWAYS, RECURSE_ONCE)
+
+
+class ExtractorBuilder(object):
+ extractor_map = {'tar': {'extractor': TarExtractor,
+ 'mimetypes': ('x-tar',),
+ 'extensions': ('tar',),
+ 'magic': ('POSIX tar archive',)},
+ 'zip': {'extractor': ZipExtractor,
+ 'mimetypes': ('zip',),
+ 'extensions': ('zip',),
+ 'magic': ('(Zip|ZIP self-extracting) archive',)},
+ 'rpm': {'extractor': RPMExtractor,
+ 'mimetypes': ('x-redhat-package-manager', 'x-rpm'),
+ 'extensions': ('rpm',),
+ 'magic': ('RPM',)},
+ 'deb': {'extractor': DebExtractor,
+ 'metadata': DebMetadataExtractor,
+ 'mimetypes': ('x-debian-package',),
+ 'extensions': ('deb',),
+ 'magic': ('Debian binary package',)},
+ 'cpio': {'extractor': CpioExtractor,
+ 'mimetypes': ('x-cpio',),
+ 'extensions': ('cpio',),
+ 'magic': ('cpio archive',)},
+ 'gem': {'extractor': GemExtractor,
+ 'metadata': GemMetadataExtractor,
+ 'mimetypes': ('x-ruby-gem',),
+ 'extensions': ('gem',)},
+ '7z': {'extractor': SevenExtractor,
+ 'mimetypes': ('x-7z-compressed',),
+ 'extensions': ('7z',),
+ 'magic': ('7-zip archive',)},
+ 'cab': {'extractor': CABExtractor,
+ 'mimetypes': ('x-cab',),
+ 'extensions': ('cab',),
+ 'magic': ('Microsoft Cabinet Archive',)},
+ 'rar': {'extractor': RarExtractor,
+ 'mimetypes': ('rar',),
+ 'extensions': ('rar',),
+ 'magic': ('RAR archive',)},
+ 'shield': {'extractor': ShieldExtractor,
+ 'mimetypes': ('x-cab',),
+ 'extensions': ('cab', 'hdr'),
+ 'magic': ('InstallShield CAB',)},
+ 'compress': {'extractor': CompressionExtractor}
+ }
+
+ mimetype_map = {}
+ magic_mime_map = {}
+ extension_map = {}
+ for ext_name, ext_info in extractor_map.items():
+ for mimetype in ext_info.get('mimetypes', ()):
+ if '/' not in mimetype:
+ mimetype = 'application/' + mimetype
+ mimetype_map[mimetype] = ext_name
+ for magic_re in ext_info.get('magic', ()):
+ magic_mime_map[re.compile(magic_re)] = ext_name
+ for extension in ext_info.get('extensions', ()):
+ extension_map.setdefault(extension, []).append((ext_name, None))
+
+ for mapping in (('tar', 'bzip2', 'tar.bz2'),
+ ('tar', 'gzip', 'tar.gz', 'tgz'),
+ ('compress', 'gzip', 'Z', 'gz'),
+ ('compress', 'bzip2', 'bz2'),
+ ('compress', 'lzma', 'lzma')):
+ for extension in mapping[2:]:
+ extension_map.setdefault(extension, []).append(mapping[:2])
+
+ magic_encoding_map = {}
+ for mapping in (('bzip2', 'bzip2 compressed'),
+ ('gzip', 'gzip compressed'),
+ ('lzma', 'LZMA compressed')):
+ for pattern in mapping[1:]:
+ magic_encoding_map[re.compile(pattern)] = mapping[0]
+
+ def __init__(self, filename, options):
+ self.filename = filename
+ self.options = options
+
+ def build_extractor(self, archive_type, encoding):
+ extractors = self.extractor_map[archive_type]
+ if self.options.metadata and extractors.has_key('metadata'):
+ extractor = extractors['metadata']
+ else:
+ extractor = extractors['extractor']
+ return extractor(self.filename, encoding)
+
+ def get_extractor(self):
+ tried_types = set()
+ # As smart as it is, the magic test can't go first, because at least
+ # on my system it just recognizes gem files as tar files. I guess
+ # it's possible for the opposite problem to occur -- where the mimetype
+ # or extension suggests something less than ideal -- but it seems less
+ # likely so I'm sticking with this.
+ for func_name in ('mimetype', 'extension', 'magic'):
+ logger.debug("getting extractors by %s" % (func_name,))
+ extractor_types = \
+ getattr(self, 'try_by_' + func_name)(self.filename)
+ logger.debug("done getting extractors")
+ for ext_args in extractor_types:
+ if ext_args in tried_types:
+ continue
+ tried_types.add(ext_args)
+ logger.debug("trying %s extractor from %s" %
+ (ext_args, func_name))
+ yield self.build_extractor(*ext_args)
+
+ def try_by_mimetype(cls, filename):
+ mimetype, encoding = mimetypes.guess_type(filename)
+ try:
+ return [(cls.mimetype_map[mimetype], encoding)]
+ except KeyError:
+ if encoding:
+ return [('compress', encoding)]
+ return []
+ try_by_mimetype = classmethod(try_by_mimetype)
+
+ def magic_map_matches(cls, output, magic_map):
+ return [result for regexp, result in magic_map.items()
+ if regexp.search(output)]
+ magic_map_matches = classmethod(magic_map_matches)
+
+ def try_by_magic(cls, filename):
+ process = subprocess.Popen(['file', '-z', filename],
+ stdout=subprocess.PIPE)
+ status = process.wait()
+ if status != 0:
+ return []
+ output = process.stdout.readline()
+ process.stdout.close()
+ if output.startswith('%s: ' % filename):
+ output = output[len(filename) + 2:]
+ mimes = cls.magic_map_matches(output, cls.magic_mime_map)
+ encodings = cls.magic_map_matches(output, cls.magic_encoding_map)
+ if mimes and not encodings:
+ encodings = [None]
+ elif encodings and not mimes:
+ mimes = ['compress']
+ return [(m, e) for m in mimes for e in encodings]
+ try_by_magic = classmethod(try_by_magic)
+
+ def try_by_extension(cls, filename):
+ parts = filename.split('.')[-2:]
+ results = []
+ while parts:
+ results.extend(cls.extension_map.get('.'.join(parts), []))
+ del parts[0]
+ return results
+ try_by_extension = classmethod(try_by_extension)
+
+
+class BaseAction(object):
+ def __init__(self, options, filenames):
+ self.options = options
+ self.filenames = filenames
+ self.target = None
+
+ def report(self, function, *args):
+ try:
+ error = function(*args)
+ except EXTRACTION_ERRORS, exception:
+ error = str(exception)
+ logger.debug(''.join(traceback.format_exception(*sys.exc_info())))
+ return error
+
+
+class ExtractionAction(BaseAction):
+ handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler,
+ BombHandler]
+
+ def __init__(self, options, filenames):
+ BaseAction.__init__(self, options, filenames)
+ self.did_print = False
+
+ def get_handler(self, extractor):
+ if extractor.content_type in ONE_ENTRY_UNKNOWN:
+ self.options.one_entry_policy.prep(self.current_filename,
+ extractor)
+ for handler in self.handlers:
+ if handler.can_handle(extractor.content_type, self.options):
+ logger.debug("using %s handler" % (handler.__name__,))
+ self.current_handler = handler(extractor, self.options)
+ break
+
+ def show_extraction(self, extractor):
+ if self.options.log_level > logging.INFO:
+ return
+ elif self.did_print:
+ print
+ else:
+ self.did_print = True
+ print "%s:" % (self.current_filename,)
+ if extractor.contents is None:
+ print self.current_handler.target
+ return
+ def reverser(x, y):
+ return cmp(y, x)
+ if self.current_handler.target == '.':
+ filenames = extractor.contents
+ filenames.sort(reverser)
+ else:
+ filenames = [self.current_handler.target]
+ pathjoin = os.path.join
+ isdir = os.path.isdir
+ while filenames:
+ filename = filenames.pop()
+ if isdir(filename):
+ print "%s/" % (filename,)
+ new_filenames = os.listdir(filename)
+ new_filenames.sort(reverser)
+ filenames.extend([pathjoin(filename, new_filename)
+ for new_filename in new_filenames])
+ else:
+ print filename
+
+ def run(self, filename, extractor):
+ self.current_filename = filename
+ error = (self.report(extractor.extract) or
+ self.report(self.get_handler, extractor) or
+ self.report(self.current_handler.handle) or
+ self.report(self.show_extraction, extractor))
+ if not error:
+ self.target = self.current_handler.target
+ return error
+
+
+class ListAction(BaseAction):
+ def __init__(self, options, filenames):
+ BaseAction.__init__(self, options, filenames)
+ self.count = 0
+
+ def get_list(self, extractor):
+ # Note: The reason I'm getting all the filenames up front is
+ # because if we run into trouble partway through the archive, we'll
+ # try another extractor. So before we display anything we have to
+ # be sure this one is successful. We maybe don't have to be quite
+ # this conservative but this is the easy way out for now.
+ self.filelist = list(extractor.get_filenames())
+
+ def show_list(self, filename):
+ self.count += 1
+ if len(self.filenames) != 1:
+ if self.count > 1:
+ print
+ print "%s:" % (filename,)
+ print '\n'.join(self.filelist)
+
+ def run(self, filename, extractor):
+ return (self.report(self.get_list, extractor) or
+ self.report(self.show_list, filename))
+
+
+class ExtractorApplication(object):
+ def __init__(self, arguments):
+ for signal_num in (signal.SIGINT, signal.SIGTERM):
+ signal.signal(signal_num, self.abort)
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+ self.parse_options(arguments)
+ self.setup_logger()
+ self.successes = []
+ self.failures = []
+
+ def clean_destination(self, dest_name):
+ try:
+ os.unlink(dest_name)
+ except OSError, error:
+ if error.errno == errno.EISDIR:
+ shutil.rmtree(dest_name, ignore_errors=True)
+
+ def abort(self, signal_num, frame):
+ signal.signal(signal_num, signal.SIG_IGN)
+ print
+ logger.debug("traceback:\n" +
+ ''.join(traceback.format_stack(frame)).rstrip())
+ logger.debug("got signal %s" % (signal_num,))
+ try:
+ basename = self.current_extractor.target
+ except AttributeError:
+ basename = None
+ if basename is not None:
+ logger.debug("cleaning up %s" % (basename,))
+ clean_targets = set([os.path.realpath('.')])
+ if hasattr(self, 'current_directory'):
+ clean_targets.add(os.path.realpath(self.current_directory))
+ for directory in clean_targets:
+ self.clean_destination(os.path.join(directory, basename))
+ sys.exit(1)
+
+ def parse_options(self, arguments):
+ parser = optparse.OptionParser(
+ usage="%prog [options] archive [archive2 ...]",
+ description="Intelligent archive extractor",
+ version=VERSION_BANNER
+ )
+ parser.add_option('-l', '-t', '--list', '--table', dest='show_list',
+ action='store_true', default=False,
+ help="list contents of archives on standard output")
+ parser.add_option('-m', '--metadata', dest='metadata',
+ action='store_true', default=False,
+ help="extract metadata from a .deb/.gem")
+ parser.add_option('-r', '--recursive', dest='recursive',
+ action='store_true', default=False,
+ help="extract archives contained in the ones listed")
+ parser.add_option('--one', '--one-entry', dest='one_entry_default',
+ default=None,
+ help=("specify extraction policy for one-entry " +
+ "archives: inside/rename/here"))
+ parser.add_option('-n', '--noninteractive', dest='batch',
+ action='store_true', default=False,
+ help="don't ask how to handle special cases")
+ parser.add_option('-o', '--overwrite', dest='overwrite',
+ action='store_true', default=False,
+ help="overwrite any existing target output")
+ parser.add_option('-f', '--flat', '--no-directory', dest='flat',
+ action='store_true', default=False,
+ help="extract everything to the current directory")
+ parser.add_option('-v', '--verbose', dest='verbose',
+ action='count', default=0,
+ help="be verbose/print debugging information")
+ parser.add_option('-q', '--quiet', dest='quiet',
+ action='count', default=3,
+ help="suppress warning/error messages")
+ self.options, filenames = parser.parse_args(arguments)
+ if not filenames:
+ parser.error("you did not list any archives")
+ # This makes WARNING is the default.
+ self.options.log_level = (10 * (self.options.quiet -
+ self.options.verbose))
+ try:
+ self.options.one_entry_policy = OneEntryPolicy(self.options)
+ except ValueError:
+ parser.error("invalid value for --one-entry option")
+ self.options.recursion_policy = RecursionPolicy(self.options)
+ self.archives = {os.path.realpath(os.curdir): filenames}
+
+ def setup_logger(self):
+ logging.getLogger().setLevel(self.options.log_level)
+ handler = logging.StreamHandler()
+ handler.setLevel(self.options.log_level)
+ formatter = logging.Formatter("dtrx: %(levelname)s: %(message)s")
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+ logger.debug("logger is set up")
+
+ def recurse(self, filename, extractor, action):
+ self.options.recursion_policy.prep(filename, action.target, extractor)
+ if self.options.recursion_policy.ok_to_recurse():
+ for filename in extractor.included_archives:
+ logger.debug("recursing with %s archive" %
+ (extractor.content_type,))
+ tail_path, basename = os.path.split(filename)
+ path_args = [self.current_directory, extractor.included_root,
+ tail_path]
+ logger.debug("included root: %s" % (extractor.included_root,))
+ logger.debug("tail path: %s" % (tail_path,))
+ if os.path.isdir(action.target):
+ logger.debug("action target: %s" % (action.target,))
+ path_args.insert(1, action.target)
+ directory = os.path.join(*path_args)
+ self.archives.setdefault(directory, []).append(basename)
+
+ def check_file(self, filename):
+ try:
+ result = os.stat(filename)
+ except OSError, error:
+ return error.strerror
+ if stat.S_ISDIR(result.st_mode):
+ return "cannot work with a directory"
+
+ def show_stderr(self, logger_func, stderr):
+ if stderr:
+ logger_func("Error output from this process:\n" +
+ stderr.rstrip('\n'))
+
+ def try_extractors(self, filename, builder):
+ errors = []
+ for extractor in builder:
+ self.current_extractor = extractor # For the abort() method.
+ error = self.action.run(filename, extractor)
+ if error:
+ errors.append((extractor.file_type, extractor.encoding, error,
+ extractor.get_stderr()))
+ if extractor.target is not None:
+ self.clean_destination(extractor.target)
+ else:
+ self.show_stderr(logger.warn, extractor.get_stderr())
+ self.recurse(filename, extractor, self.action)
+ return
+ logger.error("could not handle %s" % (filename,))
+ if not errors:
+ logger.error("not a known archive type")
+ return True
+ for file_type, encoding, error, stderr in errors:
+ message = ["treating as", file_type, "failed:", error]
+ if encoding:
+ message.insert(1, "%s-encoded" % (encoding,))
+ logger.error(' '.join(message))
+ self.show_stderr(logger.error, stderr)
+ return True
+
+ def run(self):
+ if self.options.show_list:
+ action = ListAction
+ else:
+ action = ExtractionAction
+ self.action = action(self.options, self.archives.values()[0])
+ while self.archives:
+ self.current_directory, self.filenames = self.archives.popitem()
+ os.chdir(self.current_directory)
+ for filename in self.filenames:
+ builder = ExtractorBuilder(filename, self.options)
+ error = (self.check_file(filename) or
+ self.try_extractors(filename, builder.get_extractor()))
+ if error:
+ if error != True:
+ logger.error("%s: %s" % (filename, error))
+ self.failures.append(filename)
+ else:
+ self.successes.append(filename)
+ self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP
+ if self.failures:
+ return 1
+ return 0
+
+
+if __name__ == '__main__':
+ app = ExtractorApplication(sys.argv[1:])
+ sys.exit(app.run())