summaryrefslogtreecommitdiff
path: root/tests/test_api_base.py
blob: 39b83fa484e96d392acd44ace564404b4cc98499 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# -*- coding: utf-8 -*-
# vim: ts=4 sw=4 et ai si
#
# Copyright (c) 2012-2014 Intel, Inc.
# License: GPLv2
# Author: Artem Bityutskiy <artem.bityutskiy@linux.intel.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2,
# as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.

"""
This test verifies the base bmap creation and copying API functionality. It
generates a random sparse file, then creates a bmap fir this file and copies it
to a different file using the bmap. Then it compares the original random sparse
file and the copy and verifies that they are identical.
"""

# Disable the following pylint recommendations:
#   * Too many public methods (R0904)
#   * Too many local variables (R0914)
#   * Too many statements (R0915)
# pylint: disable=R0904
# pylint: disable=R0914
# pylint: disable=R0915

import os
import sys
import tempfile
import filecmp
import subprocess
from six.moves import zip_longest
from tests import helpers
from bmaptools import BmapHelpers, BmapCreate, Filemap

# This is a work-around for Centos 6
try:
    import unittest2 as unittest  # pylint: disable=F0401
except ImportError:
    import unittest


class Error(Exception):
    """A class for exceptions generated by this test."""
    pass


def _compare_holes(file1, file2):
    """
    Make sure that files 'file1' and 'file2' have holes at the same places.
    The 'file1' and 'file2' arguments may be full file paths or file objects.
    """

    filemap1 = Filemap.filemap(file1)
    filemap2 = Filemap.filemap(file2)

    iterator1 = filemap1.get_unmapped_ranges(0, filemap1.blocks_cnt)
    iterator2 = filemap2.get_unmapped_ranges(0, filemap2.blocks_cnt)

    iterator = zip_longest(iterator1, iterator2)
    for range1, range2 in iterator:
        if range1 != range2:
            raise Error("mismatch for hole %d-%d, it is %d-%d in file2"
                        % (range1[0], range1[1], range2[0], range2[1]))


def _generate_compressed_files(file_path, delete=True):
    """
    This is a generator which yields compressed versions of a file
    'file_path'.

    The 'delete' argument specifies whether the compressed files that this
    generator yields have to be automatically deleted.
    """

    # Make sure the temporary files start with the same name as 'file_obj' in
    # order to simplify debugging.
    prefix = os.path.splitext(os.path.basename(file_path))[0] + '.'
    # Put the temporary files in the directory with 'file_obj'
    directory = os.path.dirname(file_path)

    compressors = [("bzip2",  None, ".bz2",   "-c -k"),
                   ("pbzip2", None, ".p.bz2", "-c -k"),
                   ("gzip",   None, ".gz",    "-c"),
                   ("pigz",   None, ".p.gz",  "-c -k"),
                   ("xz",     None, ".xz",    "-c -k"),
                   ("lzop",   None, ".lzo",   "-c -k"),
                   ("lz4",    None, ".lz4",   "-c -k"),
                   ("zstd",   None, ".zst",   "-c -k"),
                   # The "-P -C /" trick is used to avoid silly warnings:
                   # "tar: Removing leading `/' from member names"
                   ("bzip2", "tar", ".tar.bz2", "-c -j -O -P -C /"),
                   ("gzip",  "tar", ".tar.gz",  "-c -z -O -P -C /"),
                   ("xz",    "tar", ".tar.xz",  "-c -J -O -P -C /"),
                   ("lzop",  "tar", ".tar.lzo", "-c --lzo -O -P -C /"),
                   ("lz4",   "tar", ".tar.lz4", "-c -Ilz4 -O -P -C /"),
                   ("zstd",  "tar", ".tar.zst", "-c -Izstd -O -P -C /"),
                   ("zip",   None,  ".zip",     "-q -j -")]

    for decompressor, archiver, suffix, options in compressors:
        if not BmapHelpers.program_is_available(decompressor):
            continue
        if archiver and not BmapHelpers.program_is_available(archiver):
            continue

        tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix=prefix,
                                                   delete=delete, dir=directory,
                                                   suffix=suffix)

        if archiver:
            args = archiver + " " + options + " " + file_path
        else:
            args = decompressor + " " + options + " " + file_path
        child_process = subprocess.Popen(args, shell=True, stdout=tmp_file_obj)
        child_process.wait()
        tmp_file_obj.flush()
        yield tmp_file_obj.name
        tmp_file_obj.close()


def _do_test(image, image_size, delete=True):
    """
    A basic test for the bmap creation and copying functionality. It first
    generates a bmap for file 'image', and then copies the sparse file to a
    different file, and then checks that the original file and the copy are
    identical.

    The 'image_size' argument is size of the image in bytes. The 'delete'
    argument specifies whether the temporary files that this function creates
    have to be automatically deleted.
    """

    try:
        Filemap.filemap(image)
    except Filemap.ErrorNotSupp as e:
        sys.stderr.write('%s\n' % e)
        return

    # Make sure the temporary files start with the same name as 'image' in
    # order to simplify debugging.
    prefix = os.path.splitext(os.path.basename(image))[0] + '.'
    # Put the temporary files in the directory with the image
    directory = os.path.dirname(image)

    # Create and open a temporary file for a copy of the image
    f_copy = tempfile.NamedTemporaryFile("wb+", prefix=prefix,
                                         delete=delete, dir=directory,
                                         suffix=".copy")

    # Create and open 2 temporary files for the bmap
    f_bmap1 = tempfile.NamedTemporaryFile("w+", prefix=prefix,
                                          delete=delete, dir=directory,
                                          suffix=".bmap1")
    f_bmap2 = tempfile.NamedTemporaryFile("w+", prefix=prefix,
                                          delete=delete, dir=directory,
                                          suffix=".bmap2")

    image_chksum = helpers.calculate_chksum(image)

    #
    # Pass 1: generate the bmap, copy and compare
    #

    # Create bmap for the random sparse file
    creator = BmapCreate.BmapCreate(image, f_bmap1.name)
    creator.generate()

    helpers.copy_and_verify_image(image, f_copy.name, f_bmap1.name,
                                  image_chksum, image_size)

    # Make sure that holes in the copy are identical to holes in the random
    # sparse file.
    _compare_holes(image, f_copy.name)

    #
    # Pass 2: same as pass 1, but use file objects instead of paths
    #

    creator = BmapCreate.BmapCreate(image, f_bmap2)
    creator.generate()
    helpers.copy_and_verify_image(image, f_copy.name, f_bmap2.name,
                                  image_chksum, image_size)
    _compare_holes(image, f_copy.name)

    # Make sure the bmap files generated at pass 1 and pass 2 are identical
    assert filecmp.cmp(f_bmap1.name, f_bmap2.name, False)

    #
    # Pass 3: test compressed files copying with bmap
    #

    for compressed in _generate_compressed_files(image, delete=delete):
        helpers.copy_and_verify_image(compressed, f_copy.name,
                                      f_bmap1.name, image_chksum, image_size)

        # Test without setting the size
        helpers.copy_and_verify_image(compressed, f_copy.name, f_bmap1.name,
                                      image_chksum, None)

        # Append a "file:" prefixe to make BmapCopy use urllib
        compressed = "file:" + compressed
        helpers.copy_and_verify_image(compressed, f_copy.name, f_bmap1.name,
                                      image_chksum, image_size)
        helpers.copy_and_verify_image(compressed, f_copy.name, f_bmap1.name,
                                      image_chksum, None)

    #
    # Pass 5: copy without bmap and make sure it is identical to the original
    # file.

    helpers.copy_and_verify_image(image, f_copy.name, None, image_chksum,
                                  image_size)
    helpers.copy_and_verify_image(image, f_copy.name, None, image_chksum, None)

    #
    # Pass 6: test compressed files copying without bmap
    #

    for compressed in _generate_compressed_files(image, delete=delete):
        helpers.copy_and_verify_image(compressed, f_copy.name, f_bmap1.name,
                                      image_chksum, image_size)

        # Test without setting the size
        helpers.copy_and_verify_image(compressed, f_copy.name, f_bmap1.name,
                                      image_chksum, None)

        # Append a "file:" prefix to make BmapCopy use urllib
        helpers.copy_and_verify_image(compressed, f_copy.name, f_bmap1.name,
                                      image_chksum, image_size)
        helpers.copy_and_verify_image(compressed, f_copy.name, f_bmap1.name,
                                      image_chksum, None)

    # Close temporary files, which will also remove them
    f_copy.close()
    f_bmap1.close()
    f_bmap2.close()


class TestCreateCopy(unittest.TestCase):
    """
    The test class for this unit tests. Basically executes the '_do_test()'
    function for different sparse files.
    """

    def test(self):  # pylint: disable=R0201
        """
        The test entry point. Executes the '_do_test()' function for files of
        different sizes, holes distribution and format.
        """

        # Delete all the test-related temporary files automatically
        delete = True
        # Create all the test-related temporary files in current directory
        directory = '.'

        iterator = helpers.generate_test_files(delete=delete,
                                               directory=directory)
        for f_image, image_size, _, _ in iterator:
            assert image_size == os.path.getsize(f_image.name)
            _do_test(f_image.name, image_size, delete=delete)