diff options
Diffstat (limited to 'debian/patches/support_jessie_upgrade.diff')
-rw-r--r-- | debian/patches/support_jessie_upgrade.diff | 876 |
1 files changed, 876 insertions, 0 deletions
diff --git a/debian/patches/support_jessie_upgrade.diff b/debian/patches/support_jessie_upgrade.diff new file mode 100644 index 0000000..4929879 --- /dev/null +++ b/debian/patches/support_jessie_upgrade.diff @@ -0,0 +1,876 @@ +From fb58feac206d5a1987b8c7f74e5ee89b18f87dc6 Mon Sep 17 00:00:00 2001 +From: Nikolaus Rath <Nikolaus@rath.org> +Date: Tue, 23 Feb 2016 15:44:51 -0800 +Subject: Allow upgrade of file systems created with jessie's s3ql + +Origin: debian +Forwarded: not-needed +Patch-Name: support_jessie_upgrade.diff + +Upstream has dropped support for upgrading file systems created with +the S3QL version in jessie. This patch forward-ports this capability. +It is based on upstream's Mercurial commit 773931c43368. +--- + src/s3ql/adm.py | 205 +++++++++++++++++++++-- + src/s3ql/backends/comprenc.py | 371 ++++++++++++++++++++++++++++++++++++++---- + src/s3ql/backends/local.py | 12 +- + src/s3ql/backends/s3c.py | 29 +++- + src/s3ql/upgrade_support.py | 75 +++++++++ + 5 files changed, 647 insertions(+), 45 deletions(-) + create mode 100644 src/s3ql/upgrade_support.py + +diff --git a/src/s3ql/adm.py b/src/s3ql/adm.py +index d0e67..3b5d9 100644 +--- a/src/s3ql/adm.py ++++ b/src/s3ql/adm.py +@@ -7,19 +7,23 @@ This work can be distributed under the terms of the GNU GPLv3. + ''' + + from .logging import logging, QuietError, setup_logging +-from . import CURRENT_FS_REV, REV_VER_MAP ++from . import CURRENT_FS_REV, REV_VER_MAP, BUFSIZE + from .backends.comprenc import ComprencBackend + from .database import Connection + from .deltadump import TIME, INTEGER + from .common import (get_backend_cachedir, get_seq_no, is_mounted, get_backend, +- load_params, save_params) ++ load_params, save_params, AsyncFn, get_backend_factory, ++ pretty_print_size, split_by_n, handle_on_return) + from .metadata import dump_and_upload_metadata, download_metadata + from . import metadata + from .parse_args import ArgumentParser + from datetime import datetime as Datetime + from getpass import getpass + from contextlib import contextmanager ++from base64 import b64encode ++from queue import Queue, Full as QueueFull + import os ++import tempfile + import shutil + import functools + import sys +@@ -86,12 +90,11 @@ def main(args=None): + if options.action == 'clear': + with get_backend(options, raw=True) as backend: + return clear(backend, options) ++ elif options.action == 'upgrade': ++ return upgrade(options) + + with get_backend(options) as backend: +- if options.action == 'upgrade': +- return upgrade(backend, get_backend_cachedir(options.storage_url, +- options.cachedir)) +- elif options.action == 'passphrase': ++ if options.action == 'passphrase': + return change_passphrase(backend) + + elif options.action == 'download-metadata': +@@ -206,11 +209,24 @@ def get_old_rev_msg(rev, prog): + ''' % { 'version': REV_VER_MAP[rev], + 'prog': prog }) + +-def upgrade(backend, cachepath): ++ ++@handle_on_return ++def upgrade(options, on_return): + '''Upgrade file system to newest revision''' + + log.info('Getting file system parameters..') + ++ from . import backends ++ backends.local.SUPPORT_LEGACY_FORMAT = True ++ backends.s3c.SUPPORT_LEGACY_FORMAT = True ++ backends.comprenc.SUPPORT_LEGACY_FORMAT = True ++ ++ cachepath = get_backend_cachedir(options.storage_url, options.cachedir) ++ ++ backend_factory = get_backend_factory(options.storage_url, options.backend_options, ++ options.authfile) ++ backend = on_return.enter_context(backend_factory()) ++ + # Check for cached metadata + db = None + seq_no = get_seq_no(backend) +@@ -249,7 +265,7 @@ def upgrade(backend, cachepath): + raise QuietError() + + # Check revision +- if param['revision'] < CURRENT_FS_REV-1: ++ if param['revision'] < 21: + print(textwrap.dedent(''' + File system revision too old to upgrade! + +@@ -285,13 +301,31 @@ def upgrade(backend, cachepath): + with monkeypatch_metadata_retrieval(): + db = download_metadata(backend, cachepath + '.db') + +- log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV) +- +- param['revision'] = CURRENT_FS_REV + param['last-modified'] = time.time() + param['seq_no'] += 1 ++ ++ if param['revision'] == 21: ++ log.info('Upgrading from revision %d to %d...', 21, 22) ++ param['revision'] = 22 ++ ++ # Ensure that there are backups of the master key ++ if backend.passphrase is not None: ++ data_pw = backend.passphrase ++ backend.passphrase = backend.fs_passphrase ++ for i in range(1,4): ++ obj_id = 's3ql_passphrase_bak%d' % i ++ if obj_id not in backend: ++ backend[obj_id] = data_pw ++ backend.passphrase = data_pw ++ ++ # Upgrade to revision 22 ++ update_obj_metadata(backend, backend_factory, db, options.threads) ++ updated_from_21 = True ++ else: ++ updated_from_21 = False + +- # Upgrade ++ log.info('Upgrading from revision %d to %d...', 22, 23) ++ param['revision'] = 23 + for name in ('atime', 'mtime', 'ctime'): + db.execute('ALTER TABLE inodes ADD COLUMN {time}_ns ' + 'INT NOT NULL DEFAULT 0'.format(time=name)) +@@ -308,6 +342,14 @@ def upgrade(backend, cachepath): + + print('File system upgrade complete.') + ++ if updated_from_21 and backend.passphrase is not None: ++ print('\nPlease store the following master key in a safe location. It allows ', ++ 'decryption of the S3QL file system in case the storage objects holding ', ++ 'this information get corrupted:', ++ '---BEGIN MASTER KEY---', ++ ' '.join(split_by_n(b64encode(backend.passphrase).decode(), 4)), ++ '---END MASTER KEY---', ++ sep='\n') + + @contextmanager + def monkeypatch_metadata_retrieval(): +@@ -351,5 +393,144 @@ def monkeypatch_metadata_retrieval(): + metadata.DUMP_SPEC[2] = DUMP_SPEC_bak + metadata.create_tables = create_tables_bak + ++def update_obj_metadata(backend, backend_factory, db, thread_count): ++ '''Upgrade metadata of storage objects''' ++ ++ plain_backend = backend.backend ++ ++ # No need to update sequence number, since we are going to ++ # write out a new one after the upgrade. ++ if backend.passphrase is None: ++ extra_objects = { 's3ql_metadata' } ++ else: ++ extra_objects = { 's3ql_metadata', ++ 's3ql_passphrase', 's3ql_passphrase_bak1', ++ 's3ql_passphrase_bak2', 's3ql_passphrase_bak3' } ++ ++ for i in range(30): ++ obj_id = 's3ql_metadata_bak_%d' % i ++ if obj_id in plain_backend: ++ extra_objects.add(obj_id) ++ ++ def yield_objects(): ++ for (id_,) in db.query('SELECT id FROM objects'): ++ yield 's3ql_data_%d' % id_ ++ for obj_id in extra_objects: ++ yield obj_id ++ total = db.get_val('SELECT COUNT(id) FROM objects') + len(extra_objects) ++ ++ queue = Queue(maxsize=thread_count) ++ threads = [] ++ for _ in range(thread_count): ++ t = AsyncFn(upgrade_loop, queue, backend_factory) ++ # Don't wait for worker threads, gives deadlock if main thread ++ # terminates with exception ++ t.daemon = True ++ t.start() ++ threads.append(t) ++ ++ # Updating this value is prone to race conditions. However, ++ # we don't care because this is for an approximate progress ++ # output only. ++ queue.rewrote_size = 0 ++ stamp = 0 ++ for (i, obj_id) in enumerate(yield_objects()): ++ stamp2 = time.time() ++ if stamp2 - stamp > 1: ++ sys.stdout.write('\r..processed %d/%d objects (%.1f%%, %s rewritten)..' ++ % (i, total, i/total*100, ++ pretty_print_size(queue.rewrote_size))) ++ sys.stdout.flush() ++ stamp = stamp2 ++ ++ # Terminate early if any thread failed with an exception ++ for t in threads: ++ if not t.is_alive(): ++ t.join_and_raise() ++ ++ # Avoid blocking if all threads terminated ++ while True: ++ try: ++ queue.put(obj_id, timeout=1) ++ except QueueFull: ++ pass ++ else: ++ break ++ for t in threads: ++ if not t.is_alive(): ++ t.join_and_raise() ++ ++ queue.maxsize += len(threads) ++ for t in threads: ++ queue.put(None) ++ ++ for t in threads: ++ t.join_and_raise() ++ ++ sys.stdout.write('\n') ++ ++def upgrade_loop(queue, backend_factory): ++ ++ with backend_factory() as backend: ++ plain_backend = backend.backend ++ while True: ++ obj_id = queue.get() ++ if obj_id is None: ++ break ++ ++ meta = plain_backend.lookup(obj_id) ++ if meta.get('format_version', 0) == 2: ++ continue ++ ++ # For important objects, we make a copy first (just to be safe) ++ if not obj_id.startswith('s3ql_data'): ++ plain_backend.copy(obj_id, 's3ql_pre2.13' + obj_id[4:]) ++ ++ # When reading passphrase objects, we have to use the ++ # "outer" password ++ if obj_id.startswith('s3ql_passphrase'): ++ data_pw = backend.passphrase ++ backend.passphrase = backend.fs_passphrase ++ ++ meta = backend._convert_legacy_metadata(meta) ++ if meta['encryption'] == 'AES': ++ # Two statements to reduce likelihood of update races ++ size = rewrite_legacy_object(backend, obj_id) ++ queue.rewrote_size += size ++ else: ++ plain_backend.update_meta(obj_id, meta) ++ ++ if obj_id.startswith('s3ql_passphrase'): ++ backend.passphrase = data_pw ++ ++def rewrite_legacy_object(backend, obj_id): ++ with tempfile.TemporaryFile() as tmpfh: ++ ++ # Read object ++ def do_read(fh): ++ tmpfh.seek(0) ++ tmpfh.truncate() ++ while True: ++ buf = fh.read(BUFSIZE) ++ if not buf: ++ break ++ tmpfh.write(buf) ++ return fh.metadata ++ ++ meta = backend.perform_read(do_read, obj_id) ++ ++ # Write object ++ def do_write(fh): ++ tmpfh.seek(0) ++ while True: ++ buf = tmpfh.read(BUFSIZE) ++ if not buf: ++ break ++ fh.write(buf) ++ return fh ++ out_fh = backend.perform_write(do_write, obj_id, meta) ++ ++ return out_fh.get_obj_size() ++ + if __name__ == '__main__': + main(sys.argv[1:]) +diff --git a/src/s3ql/backends/comprenc.py b/src/s3ql/backends/comprenc.py +index 58e90..285f5 100644 +--- a/src/s3ql/backends/comprenc.py ++++ b/src/s3ql/backends/comprenc.py +@@ -25,6 +25,11 @@ import zlib + + log = logging.getLogger(__name__) + ++from ..upgrade_support import safe_unpickle, pickle ++from base64 import b64decode, b64encode ++import binascii ++SUPPORT_LEGACY_FORMAT=False ++ + HMAC_SIZE = 32 + + def sha256(s): +@@ -67,6 +72,8 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): + @copy_ancestor_docstring + def lookup(self, key): + meta_raw = self.backend.lookup(key) ++ if SUPPORT_LEGACY_FORMAT and meta_raw.get('format_version', 0) < 2: ++ meta_raw = self._convert_legacy_metadata(meta_raw) + return self._verify_meta(key, meta_raw)[1] + + @prepend_ancestor_docstring +@@ -147,43 +154,80 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): + """ + + fh = self.backend.open_read(key) ++ checksum_warning = False + try: +- meta_raw = fh.metadata +- (nonce, meta) = self._verify_meta(key, meta_raw) +- if nonce: +- data_key = sha256(self.passphrase + nonce) +- +- # The `payload_offset` key only exists if the storage object was +- # created with on old S3QL version. In order to avoid having to +- # download and re-upload the entire object during the upgrade, the +- # upgrade procedure adds this header to tell us how many bytes at +- # the beginning of the object we have to skip to get to the payload. +- if 'payload_offset' in meta_raw: +- to_skip = meta_raw['payload_offset'] +- while to_skip: +- to_skip -= len(fh.read(to_skip)) +- +- encr_alg = meta_raw['encryption'] +- if encr_alg == 'AES_v2': +- fh = DecryptFilter(fh, data_key) +- elif encr_alg != 'None': +- raise RuntimeError('Unsupported encryption: %s' % encr_alg) +- +- compr_alg = meta_raw['compression'] +- if compr_alg == 'BZIP2': +- fh = DecompressFilter(fh, bz2.BZ2Decompressor()) +- elif compr_alg == 'LZMA': +- fh = DecompressFilter(fh, lzma.LZMADecompressor()) +- elif compr_alg == 'ZLIB': +- fh = DecompressFilter(fh,zlib.decompressobj()) +- elif compr_alg != 'None': +- raise RuntimeError('Unsupported compression: %s' % compr_alg) ++ if SUPPORT_LEGACY_FORMAT: ++ if fh.metadata.get('format_version', 0) < 2: ++ meta_raw = self._convert_legacy_metadata(fh.metadata) ++ else: ++ meta_raw = fh.metadata ++ (nonce, meta) = self._verify_meta(key, meta_raw) ++ if nonce: ++ data_key = sha256(self.passphrase + nonce) ++ compr_alg = meta_raw['compression'] ++ encr_alg = meta_raw['encryption'] ++ if compr_alg == 'BZIP2': ++ decompressor = bz2.BZ2Decompressor() ++ elif compr_alg == 'LZMA': ++ decompressor = lzma.LZMADecompressor() ++ elif compr_alg == 'ZLIB': ++ decompressor = zlib.decompressobj() ++ elif compr_alg == 'None': ++ decompressor = None ++ else: ++ raise RuntimeError('Unsupported compression: %s' % compr_alg) ++ if 'payload_offset' in meta_raw: ++ to_skip = meta_raw['payload_offset'] ++ while to_skip: ++ to_skip -= len(fh.read(to_skip)) ++ checksum_warning = True ++ if encr_alg == 'AES': ++ fh = LegacyDecryptDecompressFilter(fh, data_key, decompressor) ++ decompressor = None ++ elif encr_alg == 'AES_v2': ++ fh = DecryptFilter(fh, data_key) ++ elif encr_alg != 'None': ++ raise RuntimeError('Unsupported encryption: %s' % encr_alg) ++ if decompressor: ++ fh = DecompressFilter(fh, decompressor) ++ else: ++ meta_raw = fh.metadata ++ ++ (nonce, meta) = self._verify_meta(key, meta_raw) ++ if nonce: ++ data_key = sha256(self.passphrase + nonce) ++ ++ # The `payload_offset` key only exists if the storage object was ++ # created with on old S3QL version. In order to avoid having to ++ # download and re-upload the entire object during the upgrade, the ++ # upgrade procedure adds this header to tell us how many bytes at ++ # the beginning of the object we have to skip to get to the payload. ++ if 'payload_offset' in meta_raw: ++ to_skip = meta_raw['payload_offset'] ++ while to_skip: ++ to_skip -= len(fh.read(to_skip)) ++ ++ encr_alg = meta_raw['encryption'] ++ if encr_alg == 'AES_v2': ++ fh = DecryptFilter(fh, data_key) ++ elif encr_alg != 'None': ++ raise RuntimeError('Unsupported encryption: %s' % encr_alg) ++ ++ compr_alg = meta_raw['compression'] ++ if compr_alg == 'BZIP2': ++ fh = DecompressFilter(fh, bz2.BZ2Decompressor()) ++ elif compr_alg == 'LZMA': ++ fh = DecompressFilter(fh, lzma.LZMADecompressor()) ++ elif compr_alg == 'ZLIB': ++ fh = DecompressFilter(fh,zlib.decompressobj()) ++ elif compr_alg != 'None': ++ raise RuntimeError('Unsupported compression: %s' % compr_alg) + + fh.metadata = meta + except: + # Don't emit checksum warning, caller hasn't even + # started reading anything. +- fh.close(checksum_warning=False) ++ fh.close(checksum_warning=checksum_warning) + raise + + return fh +@@ -275,6 +319,8 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): + + def _copy_or_rename(self, src, dest, rename, metadata=None): + meta_raw = self.backend.lookup(src) ++ if SUPPORT_LEGACY_FORMAT and meta_raw.get('format_version', 0) < 2: ++ meta_raw = self._convert_legacy_metadata(meta_raw) + (nonce, meta_old) = self._verify_meta(src, meta_raw) + + if nonce: +@@ -303,6 +349,165 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): + def close(self): + self.backend.close() + ++ def _convert_legacy_metadata(self, meta): ++ '''Convert metadata to newest format ++ ++ This method ensures that we can read objects written ++ by older S3QL versions. ++ ''' ++ ++ format_version = meta.get('format_version', 0) ++ assert format_version in (0,1) ++ if format_version == 0: ++ meta = self._convert_legacy_metadata0(meta) ++ return self._convert_legacy_metadata1(meta) ++ ++ def _convert_legacy_metadata0(self, meta, ++ LEN_BYTES = struct.calcsize(b'<B'), ++ TIME_BYTES = struct.calcsize(b'<f')): ++ meta_new = dict(format_version=1) ++ ++ if ('encryption' in meta and ++ 'compression' in meta): ++ meta_new['encryption'] = meta['encryption'] ++ meta_new['compression'] = meta['compression'] ++ ++ elif 'encrypted' in meta: ++ s = meta['encrypted'] ++ if s == 'True': ++ meta_new['encryption'] = 'AES' ++ meta_new['compression'] = 'BZIP2' ++ ++ elif s == 'False': ++ meta_new['encryption'] = 'None' ++ meta_new['compression'] = 'None' ++ ++ elif s.startswith('AES/'): ++ meta_new['encryption'] = 'AES' ++ meta_new['compression'] = s[4:] ++ ++ elif s.startswith('PLAIN/'): ++ meta_new['encryption'] = 'None' ++ meta_new['compression'] = s[6:] ++ else: ++ raise RuntimeError('Unsupported encryption') ++ ++ if meta_new['compression'] == 'BZ2': ++ meta_new['compression'] = 'BZIP2' ++ ++ if meta_new['compression'] == 'NONE': ++ meta_new['compression'] = 'None' ++ else: ++ meta_new['encryption'] = 'None' ++ meta_new['compression'] = 'None' ++ ++ # Extract metadata (pre 2.x versions use multiple headers) ++ if any(k.startswith('meta') for k in meta): ++ parts = [ meta[k] for k in sorted(meta.keys()) ++ if k.startswith('meta') ] ++ meta_new['data'] = ''.join(parts) ++ else: ++ try: ++ meta_new['data'] = meta['data'] ++ except KeyError: ++ raise CorruptedObjectError('meta key data is missing') ++ ++ if not self.passphrase: ++ return meta_new ++ ++ meta_buf = b64decode(meta_new['data']) ++ off = 0 ++ def read(len_): ++ nonlocal off ++ tmp = meta_buf[off:off+len_] ++ off += len_ ++ return tmp ++ ++ len_ = struct.unpack(b'<B', read(LEN_BYTES))[0] ++ nonce = read(len_) ++ key = sha256(self.passphrase + nonce) ++ cipher = aes_cipher(key) ++ hmac_ = hmac.new(key, digestmod=hashlib.sha256) ++ hash_ = read(HMAC_SIZE) ++ meta_buf = meta_buf[off:] ++ meta_buf_plain = cipher.decrypt(meta_buf) ++ hmac_.update(meta_buf_plain) ++ hash_ = cipher.decrypt(hash_) ++ ++ if not hmac.compare_digest(hash_, hmac_.digest()): ++ raise CorruptedObjectError('HMAC mismatch') ++ ++ obj_id = nonce[TIME_BYTES:].decode('utf-8') ++ meta_key = sha256(self.passphrase + nonce + b'meta') ++ meta_new['nonce'] = b64encode(nonce) ++ meta_new['payload_offset'] = LEN_BYTES + len(nonce) ++ meta_new['data'] = b64encode(aes_cipher(meta_key).encrypt(meta_buf_plain)) ++ meta_new['object_id'] = b64encode(obj_id.encode('utf-8')) ++ meta_new['signature'] = calc_legacy_meta_checksum(meta_new, meta_key) ++ ++ return meta_new ++ ++ def _convert_legacy_metadata1(self, metadata): ++ if not isinstance(metadata, dict): ++ raise CorruptedObjectError('metadata should be dict, not %s' % type(metadata)) ++ ++ for mkey in ('encryption', 'compression', 'data'): ++ if mkey not in metadata: ++ raise CorruptedObjectError('meta key %s is missing' % mkey) ++ ++ encr_alg = metadata['encryption'] ++ encrypted = (encr_alg != 'None') ++ ++ if encrypted and self.passphrase is None: ++ raise CorruptedObjectError('Encrypted object and no passphrase supplied') ++ ++ elif not encrypted and self.passphrase is not None: ++ raise ObjectNotEncrypted() ++ ++ try: ++ meta_buf = b64decode(metadata['data']) ++ except binascii.Error: ++ raise CorruptedObjectError('Invalid metadata, b64decode failed') ++ ++ if not encrypted: ++ try: ++ meta2 = safe_unpickle(meta_buf, encoding='latin1') ++ except pickle.UnpicklingError as exc: ++ raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) ++ if meta2 is None: ++ meta2 = dict() ++ metadata['data'] = freeze_basic_mapping(meta2) ++ metadata['format_version'] = 2 ++ return metadata ++ ++ # Encrypted ++ for mkey in ('nonce', 'signature', 'object_id'): ++ if mkey not in metadata: ++ raise CorruptedObjectError('meta key %s is missing' % mkey) ++ ++ nonce = b64decode(metadata['nonce']) ++ meta_key = sha256(self.passphrase + nonce + b'meta') ++ meta_sig = calc_legacy_meta_checksum(metadata, meta_key) ++ if not hmac.compare_digest(metadata['signature'], meta_sig): ++ raise CorruptedObjectError('HMAC mismatch') ++ ++ buf = aes_cipher(meta_key).decrypt(meta_buf) ++ try: ++ meta2 = safe_unpickle(buf, encoding='latin1') ++ except pickle.UnpicklingError as exc: ++ raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) ++ if meta2 is None: ++ meta2 = dict() ++ ++ meta_buf = freeze_basic_mapping(meta2) ++ metadata['nonce'] = nonce ++ metadata['object_id'] = b64decode(metadata['object_id']).decode('utf-8') ++ metadata['data'] = aes_cipher(meta_key).encrypt(meta_buf) ++ metadata['format_version'] = 2 ++ metadata['signature'] = checksum_basic_mapping(metadata, meta_key) ++ ++ return metadata ++ + class CompressFilter(object): + '''Compress data while writing''' + +@@ -675,3 +880,107 @@ class ObjectNotEncrypted(Exception): + ''' + + pass ++ ++ ++def calc_legacy_meta_checksum(metadata, key): ++ # This works most of the time, so we still try to validate the ++ # signature. But in general, the pickle output is not unique so this is ++ # not a good way to compute a checksum. ++ chk = hmac.new(key, digestmod=hashlib.sha256) ++ for mkey in sorted(metadata.keys()): ++ assert isinstance(mkey, str) ++ if mkey == 'signature': ++ continue ++ val = metadata[mkey] ++ if isinstance(val, str): ++ val = val.encode('utf-8') ++ elif not isinstance(val, (bytes, bytearray)): ++ val = pickle.dumps(val, 2) ++ chk.update(mkey.encode('utf-8') + val) ++ return b64encode(chk.digest()) ++ ++class LegacyDecryptDecompressFilter(io.RawIOBase): ++ '''Decrypt and Decompress data while reading ++ ++ Reader has to read the entire stream in order for HMAC ++ checking to work. ++ ''' ++ ++ def __init__(self, fh, key, decomp): ++ '''Initialize ++ ++ *fh* should be a file-like object and may be unbuffered. ++ ''' ++ super().__init__() ++ ++ self.fh = fh ++ self.decomp = decomp ++ self.hmac_checked = False ++ self.cipher = aes_cipher(key) ++ self.hmac = hmac.new(key, digestmod=hashlib.sha256) ++ self.hash = fh.read(HMAC_SIZE) ++ ++ def discard_input(self): ++ while True: ++ buf = self.fh.read(BUFSIZE) ++ if not buf: ++ break ++ ++ def _decrypt(self, buf): ++ # Work around https://bugs.launchpad.net/pycrypto/+bug/1256172 ++ # cipher.decrypt refuses to work with anything but bytes ++ if not isinstance(buf, bytes): ++ buf = bytes(buf) ++ ++ len_ = len(buf) ++ buf = self.cipher.decrypt(buf) ++ assert len(buf) == len_ ++ return buf ++ ++ def read(self, size=-1): ++ '''Read up to *size* bytes ++ ++ This method is currently buggy and may also return *more* ++ than *size* bytes. Callers should be prepared to handle ++ that. This is because some of the used (de)compression modules ++ don't support output limiting. ++ ''' ++ ++ if size == -1: ++ return self.readall() ++ elif size == 0: ++ return b'' ++ ++ buf = None ++ while not buf: ++ buf = self.fh.read(size) ++ if not buf and not self.hmac_checked: ++ if not hmac.compare_digest(self._decrypt(self.hash), ++ self.hmac.digest()): ++ raise CorruptedObjectError('HMAC mismatch') ++ elif self.decomp and self.decomp.unused_data: ++ raise CorruptedObjectError('Data after end of compressed stream') ++ else: ++ self.hmac_checked = True ++ return b'' ++ elif not buf: ++ return b'' ++ ++ buf = self._decrypt(buf) ++ if not self.decomp: ++ break ++ ++ buf = decompress(self.decomp, buf) ++ ++ self.hmac.update(buf) ++ return buf ++ ++ def close(self, *a, **kw): ++ self.fh.close(*a, **kw) ++ ++ def __enter__(self): ++ return self ++ ++ def __exit__(self, *a): ++ self.close() ++ return False +diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py +index a8ebf..03cfd 100644 +--- a/src/s3ql/backends/local.py ++++ b/src/s3ql/backends/local.py +@@ -18,6 +18,9 @@ import io + import os + import shutil + ++from ..upgrade_support import safe_unpickle_fh, pickle ++SUPPORT_LEGACY_FORMAT=False ++ + log = logging.getLogger(__name__) + + class Backend(AbstractBackend, metaclass=ABCDocstMeta): +@@ -241,7 +244,14 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): + def _read_meta(fh): + buf = fh.read(9) + if not buf.startswith(b's3ql_1\n'): +- raise CorruptedObjectError('Invalid object header: %r' % buf) ++ if SUPPORT_LEGACY_FORMAT: ++ fh.seek(0) ++ try: ++ return safe_unpickle_fh(fh, encoding='latin1') ++ except pickle.UnpicklingError as exc: ++ raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) ++ else: ++ raise CorruptedObjectError('Invalid object header: %r' % buf) + + len_ = struct.unpack('<H', buf[-2:])[0] + try: +diff --git a/src/s3ql/backends/s3c.py b/src/s3ql/backends/s3c.py +index 0a5a4..dc049 100644 +--- a/src/s3ql/backends/s3c.py ++++ b/src/s3ql/backends/s3c.py +@@ -34,6 +34,9 @@ import time + import ssl + import urllib.parse + ++from ..upgrade_support import safe_unpickle, pickle ++SUPPORT_LEGACY_FORMAT=False ++ + C_DAY_NAMES = [ 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun' ] + C_MONTH_NAMES = [ 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec' ] + +@@ -742,7 +745,31 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): + + format_ = resp.headers.get('%smeta-format' % self.hdr_prefix, 'raw') + if format_ != 'raw2': # Current +- raise CorruptedObjectError('Invalid metadata format: %s' % format_) ++ if SUPPORT_LEGACY_FORMAT: ++ meta = CaseInsensitiveDict() ++ pattern = re.compile(r'^%smeta-(.+)$' % re.escape(self.hdr_prefix), ++ re.IGNORECASE) ++ for fname in resp.headers: ++ hit = pattern.search(fname) ++ if hit: ++ meta[hit.group(1)] = resp.headers[fname] ++ ++ if format_ == 'raw': ++ return meta ++ ++ # format_ == pickle ++ buf = ''.join(meta[x] for x in sorted(meta) if x.lower().startswith('data-')) ++ if 'md5' in meta and md5sum_b64(buf.encode('us-ascii')) != meta['md5']: ++ log.warning('MD5 mismatch in metadata for %s', obj_key) ++ raise BadDigestError('BadDigest', 'Meta MD5 for %s does not match' % obj_key) ++ try: ++ return safe_unpickle(b64decode(buf), encoding='latin1') ++ except binascii.Error: ++ raise CorruptedObjectError('Corrupted metadata, b64decode failed') ++ except pickle.UnpicklingError as exc: ++ raise CorruptedObjectError('Corrupted metadata, pickle says: %s' % exc) ++ else: ++ raise CorruptedObjectError('Invalid metadata format: %s' % format_) + + parts = [] + for i in count(): +diff --git a/src/s3ql/upgrade_support.py b/src/s3ql/upgrade_support.py +new file mode 100644 +index 00000..17079 +--- /dev/null ++++ b/src/s3ql/upgrade_support.py +@@ -0,0 +1,75 @@ ++''' ++Routines for reading old metadata to allow upgrade. ++Forward-ported from Mercurial commit 773931c43368. ++''' ++ ++from .logging import logging # Ensure use of custom logger class ++import pickletools ++import pickle ++import codecs ++import io ++ ++log = logging.getLogger(__name__) ++ ++SAFE_UNPICKLE_OPCODES = {'BININT', 'BININT1', 'BININT2', 'LONG1', 'LONG4', ++ 'BINSTRING', 'SHORT_BINSTRING', 'GLOBAL', ++ 'NONE', 'NEWTRUE', 'NEWFALSE', 'BINUNICODE', ++ 'BINFLOAT', 'EMPTY_LIST', 'APPEND', 'APPENDS', ++ 'LIST', 'EMPTY_TUPLE', 'TUPLE', 'TUPLE1', 'TUPLE2', ++ 'TUPLE3', 'EMPTY_DICT', 'DICT', 'SETITEM', ++ 'SETITEMS', 'POP', 'DUP', 'MARK', 'POP_MARK', ++ 'BINGET', 'LONG_BINGET', 'BINPUT', 'LONG_BINPUT', ++ 'PROTO', 'STOP', 'REDUCE'} ++ ++SAFE_UNPICKLE_GLOBAL_NAMES = { ('__builtin__', 'bytearray'), ++ ('__builtin__', 'set'), ++ ('__builtin__', 'frozenset'), ++ ('_codecs', 'encode') } ++SAFE_UNPICKLE_GLOBAL_OBJS = { bytearray, set, frozenset, codecs.encode } ++ ++class SafeUnpickler(pickle.Unpickler): ++ def find_class(self, module, name): ++ if (module, name) not in SAFE_UNPICKLE_GLOBAL_NAMES: ++ raise pickle.UnpicklingError("global '%s.%s' is unsafe" % ++ (module, name)) ++ ret = super().find_class(module, name) ++ if ret not in SAFE_UNPICKLE_GLOBAL_OBJS: ++ raise pickle.UnpicklingError("global '%s.%s' is unsafe" % ++ (module, name)) ++ return ret ++ ++ ++def safe_unpickle_fh(fh, fix_imports=True, encoding="ASCII", ++ errors="strict"): ++ '''Safely unpickle untrusted data from *fh* ++ ++ *fh* must be seekable. ++ ''' ++ ++ if not fh.seekable(): ++ raise TypeError('*fh* must be seekable') ++ pos = fh.tell() ++ ++ # First make sure that we know all used opcodes ++ try: ++ for (opcode, arg, _) in pickletools.genops(fh): ++ if opcode.proto > 2 or opcode.name not in SAFE_UNPICKLE_OPCODES: ++ raise pickle.UnpicklingError('opcode %s is unsafe' % opcode.name) ++ except (ValueError, EOFError): ++ raise pickle.UnpicklingError('corrupted data') ++ ++ fh.seek(pos) ++ ++ # Then use a custom Unpickler to ensure that we only give access to ++ # specific, whitelisted globals. Note that with the above opcodes, there is ++ # no way to trigger attribute access, so "brachiating" from a white listed ++ # object to __builtins__ is not possible. ++ return SafeUnpickler(fh, fix_imports=fix_imports, ++ encoding=encoding, errors=errors).load() ++ ++def safe_unpickle(buf, fix_imports=True, encoding="ASCII", ++ errors="strict"): ++ '''Safely unpickle untrusted data in *buf*''' ++ ++ return safe_unpickle_fh(io.BytesIO(buf), fix_imports=fix_imports, ++ encoding=encoding, errors=errors) |