diff options
Diffstat (limited to 'pwnlib/libcdb.py')
-rw-r--r-- | pwnlib/libcdb.py | 309 |
1 files changed, 254 insertions, 55 deletions
diff --git a/pwnlib/libcdb.py b/pwnlib/libcdb.py index 932f295..0fb186b 100644 --- a/pwnlib/libcdb.py +++ b/pwnlib/libcdb.py @@ -4,22 +4,18 @@ Fetch a LIBC binary based on some heuristics. from __future__ import absolute_import from __future__ import division -import codecs -import json import os +import six import tempfile from pwnlib.context import context from pwnlib.elf import ELF from pwnlib.log import getLogger from pwnlib.tubes.process import process -from pwnlib.util.fiddling import b64d from pwnlib.util.fiddling import enhex -from pwnlib.util.fiddling import hexdump from pwnlib.util.misc import read from pwnlib.util.misc import which from pwnlib.util.misc import write -from pwnlib.util.safeeval import const from pwnlib.util.web import wget log = getLogger(__name__) @@ -62,40 +58,46 @@ def provider_libcdb(hex_encoded_id, hash_type): log.warn_once("Failed to fetch libc for %s %s from libcdb: %s", hash_type, hex_encoded_id, e) return data -# https://libc.rip/ -def provider_libc_rip(hex_encoded_id, hash_type): +def query_libc_rip(params): # Deferred import because it's slow import requests + url = "https://libc.rip/api/find" + try: + result = requests.post(url, json=params, timeout=20) + result.raise_for_status() + if result.status_code != 200: + log.debug("Error: %s", result.text) + return None + return result.json() + except requests.RequestException as e: + log.warn_once("Failed to fetch libc info from libc.rip: %s", e) + return None + +# https://libc.rip/ +def provider_libc_rip(hex_encoded_id, hash_type): # Build the request for the hash type # https://github.com/niklasb/libc-database/blob/master/searchengine/api.yml if hash_type == 'build_id': hash_type = 'buildid' - url = "https://libc.rip/api/find" params = {hash_type: hex_encoded_id} - data = b"" - try: - result = requests.post(url, json=params, timeout=20) - result.raise_for_status() - libc_match = result.json() - if not libc_match: - log.warn_once("Could not find libc for %s %s on libc.rip", hash_type, hex_encoded_id) - return None + libc_match = query_libc_rip(params) + if not libc_match: + log.warn_once("Could not find libc info for %s %s on libc.rip", hash_type, hex_encoded_id) + return None - if len(libc_match) > 1: - log.debug("Received multiple matches. Choosing the first match and discarding the others.") - log.debug("%r", libc_match) + if len(libc_match) > 1: + log.debug("Received multiple matches. Choosing the first match and discarding the others.") + log.debug("%r", libc_match) - url = libc_match[0]['download_url'] - log.debug("Downloading data from libc.rip: %s", url) - data = wget(url, timeout=20) + url = libc_match[0]['download_url'] + log.debug("Downloading data from libc.rip: %s", url) + data = wget(url, timeout=20) - if not data: - log.warn_once("Could not fetch libc for %s %s from libc.rip", hash_type, hex_encoded_id) - return None - except requests.RequestException as e: - log.warn_once("Failed to fetch libc for %s %s from libc.rip: %s", hash_type, hex_encoded_id, e) + if not data: + log.warn_once("Could not fetch libc binary for %s %s from libc.rip", hash_type, hex_encoded_id) + return None return data PROVIDERS = [provider_libcdb, provider_libc_rip] @@ -255,6 +257,211 @@ def unstrip_libc(filename): return True +def _extract_tarfile(cache_dir, data_filename, tarball): + from six import BytesIO + import tarfile + # Handle zstandard compression, since tarfile only supports gz, bz2, and xz. + if data_filename.endswith('.zst') or data_filename.endswith('.zstd'): + import zstandard + dctx = zstandard.ZstdDecompressor() + decompressed_tar = BytesIO() + dctx.copy_stream(tarball, decompressed_tar) + decompressed_tar.seek(0) + tarball.close() + tarball = decompressed_tar + + if six.PY2 and data_filename.endswith('.xz'): + # Python 2's tarfile doesn't support xz, so we need to decompress it first. + # Shell out to xz, since the Python 2 pylzma module is broken. + # (https://github.com/fancycode/pylzma/issues/67) + if not which('xz'): + log.error('Couldn\'t find "xz" in PATH. Please install xz first.') + import subprocess + try: + uncompressed_tarball = subprocess.check_output(['xz', '--decompress', '--stdout', tarball.name]) + tarball = BytesIO(uncompressed_tarball) + except subprocess.CalledProcessError: + log.error('Failed to decompress xz archive.') + + with tarfile.open(fileobj=tarball) as tar_file: + # Find the library folder in the archive (e.g. /lib/x86_64-linux-gnu/) + lib_dir = None + libc_name = None + for member in tar_file.getmembers(): + if not member.isfile(): + continue + libc_name = os.path.basename(member.name) + if libc_name == 'libc.so.6' or (libc_name.startswith('libc') and libc_name.endswith('.so')): + lib_dir = os.path.dirname(member.name) + break + else: + log.error('Couldn\'t find library folder containing the libc in the archive.') + + # Extract everything in the library folder + for member in tar_file.getmembers(): + if os.path.dirname(member.name) != lib_dir: + continue + if not member.isfile() and not member.issym(): + continue + # Extract while keeping file permissions + tar_file.extract(member, cache_dir) + + # Move the files up to the cache root + target_dir = os.path.join(cache_dir, lib_dir) + for file in os.listdir(target_dir): + os.rename(os.path.join(target_dir, file), os.path.join(cache_dir, file)) + os.removedirs(target_dir) + + return os.path.join(cache_dir, libc_name) + +def _extract_debfile(cache_dir, package_filename, package): + # Extract data.tar in the .deb archive. + if six.PY2: + if not which('ar'): + log.error('Missing command line tool "ar" to extract .deb archive. Please install "ar" first.') + + import atexit + import shutil + import subprocess + + # Use mkdtemp instead of TemporaryDirectory because the latter is not available in Python 2. + tempdir = tempfile.mkdtemp(prefix=".pwntools-tmp") + atexit.register(shutil.rmtree, tempdir) + with tempfile.NamedTemporaryFile(mode='wb', dir=tempdir) as debfile: + debfile.write(package) + debfile.flush() + try: + files_in_deb = subprocess.check_output(['ar', 't', debfile.name]).split(b'\n') + except subprocess.CalledProcessError: + log.error('Failed to list files in .deb archive.') + [data_filename] = filter(lambda f: f.startswith(b'data.tar'), files_in_deb) + + try: + subprocess.check_call(['ar', 'x', debfile.name, data_filename], cwd=tempdir) + except subprocess.CalledProcessError: + log.error('Failed to extract data.tar from .deb archive.') + + with open(os.path.join(tempdir, data_filename), 'rb') as tarball: + return _extract_tarfile(cache_dir, data_filename, tarball) + else: + import unix_ar + from six import BytesIO + ar_file = unix_ar.open(BytesIO(package)) + try: + data_filename = next(filter(lambda f: f.name.startswith(b'data.tar'), ar_file.infolist())).name.decode() + tarball = ar_file.open(data_filename) + return _extract_tarfile(cache_dir, data_filename, tarball) + finally: + ar_file.close() + +def _extract_pkgfile(cache_dir, package_filename, package): + from six import BytesIO + return _extract_tarfile(cache_dir, package_filename, BytesIO(package)) + +def _find_libc_package_lib_url(libc): + # Check https://libc.rip for the libc package + libc_match = query_libc_rip({'buildid': enhex(libc.buildid)}) + if libc_match is not None: + for match in libc_match: + yield match['libs_url'] + + # Check launchpad.net if it's an Ubuntu libc + # GNU C Library (Ubuntu GLIBC 2.36-0ubuntu4) + import re + version = re.search(br'GNU C Library \(Ubuntu E?GLIBC ([^\)]+)\)', libc.data) + if version is not None: + libc_version = version.group(1).decode() + yield 'https://launchpad.net/ubuntu/+archive/primary/+files/libc6_{}_{}.deb'.format(libc_version, libc.arch) + +def download_libraries(libc_path, unstrip=True): + """download_libraries(str, bool) -> str + Download the matching libraries for the given libc binary and cache + them in a local directory. The libraries are looked up using `libc.rip <https://libc.rip>`_ + and fetched from the official package repositories if available. + + This commonly includes the ``ld-linux-x86-64.so.2`` and ``libpthread.so.0`` binaries + which can be used to execute the program locally when the given libc is + incompatible with the local dynamic loader. + + Note: Only .deb and .pkg.tar.* packages are currently supported (Debian/Ubuntu, Arch). + + Arguments: + libc_path(str): + The path the libc binary. + unstrip(bool): + Try to fetch debug info for the libc and apply it to the downloaded file. + + Returns: + The path to the cached directory containing the downloaded libraries. + + Example: + >>> libc_path = ELF(which('ls'), checksec=False).libc.path + >>> lib_path = download_libraries(libc_path) + >>> lib_path is not None + True + >>> os.path.exists(os.path.join(lib_path, 'libc.so.6')) + True + >>> os.path.exists(os.path.join(lib_path, 'ld-linux-x86-64.so.2')) + True + """ + + libc = ELF(libc_path, checksec=False) + if not libc.buildid: + log.warn_once('Given libc does not have a buildid.') + return None + + # Handle caching and don't redownload if it already exists. + cache_dir = os.path.join(context.cache_dir, 'libcdb_libs') + if not os.path.isdir(cache_dir): + os.makedirs(cache_dir) + + cache_dir = os.path.join(cache_dir, enhex(libc.buildid)) + if os.path.exists(cache_dir): + return cache_dir + + for package_url in _find_libc_package_lib_url(libc): + extension_handlers = { + '.deb': _extract_debfile, + '.pkg.tar.xz': _extract_pkgfile, + '.pkg.tar.zst': _extract_pkgfile, + } + + package_filename = os.path.basename(package_url) + for extension, handler in extension_handlers.items(): + if package_filename.endswith(extension): + break + else: + log.failure('Cannot handle %s (%s)', package_filename, package_url) + continue + + # Download the package + package = wget(package_url, timeout=20) + if not package: + continue + + # Create target cache directory to extract files into + if not os.path.isdir(cache_dir): + os.makedirs(cache_dir) + + try: + # Extract the archive + libc_path = handler(cache_dir, package_filename, package) + except Exception as e: + os.removedirs(cache_dir) + log.failure('Failed to extract %s: %s', package_filename, e) + continue + # Unstrip the libc binary + try: + if unstrip: + unstrip_libc(libc_path) + except Exception: + pass + + return cache_dir + + log.warn_once('Failed to find matching libraries for provided libc.') + return None + def _handle_multiple_matching_libcs(matching_libcs): from pwnlib.term import text from pwnlib.ui import options @@ -311,41 +518,33 @@ def search_by_symbol_offsets(symbols, select_index=None, unstrip=True, return_as >>> for buildid in matched_libcs: # doctest +SKIP ... libc = ELF(search_by_build_id(buildid)) # doctest +SKIP """ - import requests for symbol, address in symbols.items(): if isinstance(address, int): symbols[symbol] = hex(address) - try: - params = {'symbols': symbols} - url = "https://libc.rip/api/find" - log.debug('Request: %s', params) - result = requests.post(url, json=params, timeout=20) - result.raise_for_status() - matching_libcs = result.json() - log.debug('Result: %s', matching_libcs) - if len(matching_libcs) == 0: - log.warn_once("No matching libc for symbols %r on libc.rip", symbols) - return None + params = {'symbols': symbols} + log.debug('Request: %s', params) + matching_libcs = query_libc_rip(params) + log.debug('Result: %s', matching_libcs) + if matching_libcs is None or len(matching_libcs) == 0: + log.warn_once("No matching libc for symbols %r on libc.rip", symbols) + return None - if return_as_list: - return [libc['buildid'] for libc in matching_libcs] + if return_as_list: + return [libc['buildid'] for libc in matching_libcs] - if len(matching_libcs) == 1: - return search_by_build_id(matching_libcs[0]['buildid'], unstrip=unstrip) + if len(matching_libcs) == 1: + return search_by_build_id(matching_libcs[0]['buildid'], unstrip=unstrip) - if select_index is not None: - if select_index > 0 and select_index <= len(matching_libcs): - return search_by_build_id(matching_libcs[select_index - 1]['buildid'], unstrip=unstrip) - else: - log.error('Invalid selected libc index. %d is not in the range of 1-%d.', select_index, len(matching_libcs)) - return None + if select_index is not None: + if select_index > 0 and select_index <= len(matching_libcs): + return search_by_build_id(matching_libcs[select_index - 1]['buildid'], unstrip=unstrip) + else: + log.error('Invalid selected libc index. %d is not in the range of 1-%d.', select_index, len(matching_libcs)) + return None - selected_libc = _handle_multiple_matching_libcs(matching_libcs) - return search_by_build_id(selected_libc['buildid'], unstrip=unstrip) - except requests.RequestException as e: - log.warn_once("Failed to lookup libc for symbols %r from libc.rip: %s", symbols, e) - return None + selected_libc = _handle_multiple_matching_libcs(matching_libcs) + return search_by_build_id(selected_libc['buildid'], unstrip=unstrip) def search_by_build_id(hex_encoded_id, unstrip=True): """ @@ -503,4 +702,4 @@ def get_build_id_offsets(): }.get(context.arch, []) -__all__ = ['get_build_id_offsets', 'search_by_build_id', 'search_by_sha1', 'search_by_sha256', 'search_by_md5', 'unstrip_libc', 'search_by_symbol_offsets'] +__all__ = ['get_build_id_offsets', 'search_by_build_id', 'search_by_sha1', 'search_by_sha256', 'search_by_md5', 'unstrip_libc', 'search_by_symbol_offsets', 'download_libraries'] |