From 270d5ddc31c26b62379e3caa9044dd75ccc71847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Picca=20Fr=C3=A9d=C3=A9ric-Emmanuel?= Date: Sun, 4 Mar 2018 10:20:27 +0100 Subject: New upstream version 0.7.0+dfsg --- silx/opencl/codec/__init__.py | 0 silx/opencl/codec/byte_offset.py | 439 +++++++++++++++++++++++++++++ silx/opencl/codec/setup.py | 43 +++ silx/opencl/codec/test/__init__.py | 37 +++ silx/opencl/codec/test/test_byte_offset.py | 317 +++++++++++++++++++++ 5 files changed, 836 insertions(+) create mode 100644 silx/opencl/codec/__init__.py create mode 100644 silx/opencl/codec/byte_offset.py create mode 100644 silx/opencl/codec/setup.py create mode 100644 silx/opencl/codec/test/__init__.py create mode 100644 silx/opencl/codec/test/test_byte_offset.py (limited to 'silx/opencl/codec') diff --git a/silx/opencl/codec/__init__.py b/silx/opencl/codec/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/silx/opencl/codec/byte_offset.py b/silx/opencl/codec/byte_offset.py new file mode 100644 index 0000000..565b0c5 --- /dev/null +++ b/silx/opencl/codec/byte_offset.py @@ -0,0 +1,439 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Project: Sift implementation in Python + OpenCL +# https://github.com/silx-kit/silx +# +# Copyright (C) 2013-2018 European Synchrotron Radiation Facility, Grenoble, France +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +""" +This module provides a class for CBF byte offset compression/decompression. +""" + +from __future__ import division, print_function, with_statement + +__authors__ = ["Jérôme Kieffer"] +__contact__ = "jerome.kieffer@esrf.eu" +__license__ = "MIT" +__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" +__date__ = "24/10/2017" +__status__ = "production" + + +import functools +import os +import numpy +from ..common import ocl, pyopencl +from ..processing import BufferDescription, EventDescription, OpenclProcessing + +import logging +logger = logging.getLogger(__name__) + +if pyopencl: + import pyopencl.version + if pyopencl.version.VERSION < (2016, 0): + from pyopencl.scan import GenericScanKernel, GenericDebugScanKernel + else: + from pyopencl.algorithm import GenericScanKernel + from pyopencl.scan import GenericDebugScanKernel +else: + logger.warning("No PyOpenCL, no byte-offset, please see fabio") + + +class ByteOffset(OpenclProcessing): + """Perform the byte offset compression/decompression on the GPU + + See :class:`OpenclProcessing` for optional arguments description. + + :param int raw_size: + Size of the raw stream for decompression. + It can be (slightly) larger than the array. + :param int dec_size: + Size of the decompression output array + (mandatory for decompression) + """ + + def __init__(self, raw_size=None, dec_size=None, + ctx=None, devicetype="all", + platformid=None, deviceid=None, + block_size=None, profile=False): + OpenclProcessing.__init__(self, ctx=ctx, devicetype=devicetype, + platformid=platformid, deviceid=deviceid, + block_size=block_size, profile=profile) + if self.block_size is None: + self.block_size = self.device.max_work_group_size + wg = self.block_size + + buffers = [BufferDescription("counter", 1, numpy.int32, None)] + + if raw_size is None: + self.raw_size = -1 + self.padded_raw_size = -1 + else: + self.raw_size = int(raw_size) + self.padded_raw_size = int((self.raw_size + wg - 1) & ~(wg - 1)) + buffers += [ + BufferDescription("raw", self.padded_raw_size, numpy.int8, None), + BufferDescription("mask", self.padded_raw_size, numpy.int32, None), + BufferDescription("values", self.padded_raw_size, numpy.int32, None), + BufferDescription("exceptions", self.padded_raw_size, numpy.int32, None) + ] + + if dec_size is None: + self.dec_size = None + else: + self.dec_size = numpy.int32(dec_size) + buffers += [ + BufferDescription("data_float", self.dec_size, numpy.float32, None), + BufferDescription("data_int", self.dec_size, numpy.int32, None) + ] + + self.allocate_buffers(buffers, use_array=True) + + self.compile_kernels([os.path.join("codec", "byte_offset")]) + self.kernels.__setattr__("scan", self._init_double_scan()) + self.kernels.__setattr__("compression_scan", + self._init_compression_scan()) + + def _init_double_scan(self): + """"generates a double scan on indexes and values in one operation""" + arguments = "__global int *value", "__global int *index" + int2 = pyopencl.tools.get_or_register_dtype("int2") + input_expr = "index[i]>0 ? (int2)(0, 0) : (int2)(value[i], 1)" + scan_expr = "a+b" + neutral = "(int2)(0,0)" + output_statement = "value[i] = item.s0; index[i+1] = item.s1;" + + if self.block_size > 256: + knl = GenericScanKernel(self.ctx, + dtype=int2, + arguments=arguments, + input_expr=input_expr, + scan_expr=scan_expr, + neutral=neutral, + output_statement=output_statement) + else: # MacOS on CPU + knl = GenericDebugScanKernel(self.ctx, + dtype=int2, + arguments=arguments, + input_expr=input_expr, + scan_expr=scan_expr, + neutral=neutral, + output_statement=output_statement) + return knl + + def decode(self, raw, as_float=False, out=None): + """This function actually performs the decompression by calling the kernels + + :param numpy.ndarray raw: The compressed data as a 1D numpy array of char. + :param bool as_float: True to decompress as float32, + False (default) to decompress as int32 + :param pyopencl.array out: pyopencl array in which to place the result. + :return: The decompressed image as an pyopencl array. + :rtype: pyopencl.array + """ + assert self.dec_size is not None, \ + "dec_size is a mandatory ByteOffset init argument for decompression" + + events = [] + with self.sem: + len_raw = numpy.int32(len(raw)) + if len_raw > self.padded_raw_size: + wg = self.block_size + self.raw_size = int(len(raw)) + self.padded_raw_size = (self.raw_size + wg - 1) & ~(wg - 1) + logger.info("increase raw buffer size to %s", self.padded_raw_size) + buffers = { + "raw": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int8), + "mask": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32), + "exceptions": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32), + "values": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32), + } + self.cl_mem.update(buffers) + else: + wg = self.block_size + + evt = pyopencl.enqueue_copy(self.queue, self.cl_mem["raw"].data, + raw, + is_blocking=False) + events.append(EventDescription("copy raw H -> D", evt)) + evt = self.kernels.fill_int_mem(self.queue, (self.padded_raw_size,), (wg,), + self.cl_mem["mask"].data, + numpy.int32(self.padded_raw_size), + numpy.int32(0), + numpy.int32(0)) + events.append(EventDescription("memset mask", evt)) + evt = self.kernels.fill_int_mem(self.queue, (1,), (1,), + self.cl_mem["counter"].data, + numpy.int32(1), + numpy.int32(0), + numpy.int32(0)) + events.append(EventDescription("memset counter", evt)) + evt = self.kernels.mark_exceptions(self.queue, (self.padded_raw_size,), (wg,), + self.cl_mem["raw"].data, + len_raw, + numpy.int32(self.raw_size), + self.cl_mem["mask"].data, + self.cl_mem["values"].data, + self.cl_mem["counter"].data, + self.cl_mem["exceptions"].data) + events.append(EventDescription("mark exceptions", evt)) + nb_exceptions = numpy.empty(1, dtype=numpy.int32) + evt = pyopencl.enqueue_copy(self.queue, nb_exceptions, self.cl_mem["counter"].data, + is_blocking=False) + events.append(EventDescription("copy counter D -> H", evt)) + evt.wait() + nbexc = int(nb_exceptions[0]) + if nbexc == 0: + logger.info("nbexc %i", nbexc) + else: + evt = self.kernels.treat_exceptions(self.queue, (nbexc,), (1,), + self.cl_mem["raw"].data, + len_raw, + self.cl_mem["mask"].data, + self.cl_mem["exceptions"].data, + self.cl_mem["values"].data + ) + events.append(EventDescription("treat_exceptions", evt)) + + #self.cl_mem["copy_values"] = self.cl_mem["values"].copy() + #self.cl_mem["copy_mask"] = self.cl_mem["mask"].copy() + evt = self.kernels.scan(self.cl_mem["values"], + self.cl_mem["mask"], + queue=self.queue, + size=int(len_raw), + wait_for=(evt,)) + events.append(EventDescription("double scan", evt)) + #evt.wait() + if out is not None: + if out.dtype == numpy.float32: + copy_results = self.kernels.copy_result_float + else: + copy_results = self.kernels.copy_result_int + else: + if as_float: + out = self.cl_mem["data_float"] + copy_results = self.kernels.copy_result_float + else: + out = self.cl_mem["data_int"] + copy_results = self.kernels.copy_result_int + evt = copy_results(self.queue, (self.padded_raw_size,), (wg,), + self.cl_mem["values"].data, + self.cl_mem["mask"].data, + len_raw, + self.dec_size, + out.data + ) + events.append(EventDescription("copy_results", evt)) + #evt.wait() + if self.profile: + self.events += events + return out + + __call__ = decode + + def _init_compression_scan(self): + """Initialize CBF compression scan kernels""" + preamble = """ + int compressed_size(int diff) { + int abs_diff = abs(diff); + + if (abs_diff < 128) { + return 1; + } + else if (abs_diff < 32768) { + return 3; + } + else { + return 7; + } + } + + void write(const int index, + const int diff, + global char *output) { + int abs_diff = abs(diff); + + if (abs_diff < 128) { + output[index] = (char) diff; + } + else if (abs_diff < 32768) { + output[index] = -128; + output[index + 1] = (char) (diff >> 0); + output[index + 2] = (char) (diff >> 8); + } + else { + output[index] = -128; + output[index + 1] = 0; + output[index + 2] = -128; + output[index + 3] = (char) (diff >> 0); + output[index + 4] = (char) (diff >> 8); + output[index + 5] = (char) (diff >> 16); + output[index + 6] = (char) (diff >> 24); + } + } + """ + arguments = "__global const int *data, __global char *compressed, __global int *size" + input_expr = "compressed_size((i == 0) ? data[0] : (data[i] - data[i - 1]))" + scan_expr = "a+b" + neutral = "0" + output_statement = """ + if (prev_item == 0) { // 1st thread store compressed data size + size[0] = last_item; + } + write(prev_item, (i == 0) ? data[0] : (data[i] - data[i - 1]), compressed); + """ + + if self.block_size >= 64: + knl = GenericScanKernel(self.ctx, + dtype=numpy.int32, + preamble=preamble, + arguments=arguments, + input_expr=input_expr, + scan_expr=scan_expr, + neutral=neutral, + output_statement=output_statement) + else: # MacOS on CPU + knl = GenericDebugScanKernel(self.ctx, + dtype=numpy.int32, + preamble=preamble, + arguments=arguments, + input_expr=input_expr, + scan_expr=scan_expr, + neutral=neutral, + output_statement=output_statement) + return knl + + def encode(self, data, out=None): + """Compress data to CBF. + + :param data: The data to compress as a numpy array + (or a pyopencl Array) of int32. + :type data: Union[numpy.ndarray, pyopencl.array.Array] + :param pyopencl.array out: + pyopencl array of int8 in which to store the result. + The array should be large enough to store the compressed data. + :return: The compressed data as a pyopencl array. + If out is provided, this array shares the backing buffer, + but has the exact size of the compressed data and the queue + of the ByteOffset instance. + :rtype: pyopencl.array + :raises ValueError: if out array is not large enough + """ + + events = [] + with self.sem: + if isinstance(data, pyopencl.array.Array): + d_data = data # Uses provided array + + else: # Copy data to device + data = numpy.ascontiguousarray(data, dtype=numpy.int32).ravel() + + # Make sure data array exists and is large enough + if ("data_input" not in self.cl_mem or + self.cl_mem["data_input"].size < data.size): + logger.info("increase data input buffer size to %s", data.size) + self.cl_mem.update({ + "data_input": pyopencl.array.empty(self.queue, + data.size, + dtype=numpy.int32)}) + d_data = self.cl_mem["data_input"] + + evt = pyopencl.enqueue_copy( + self.queue, d_data.data, data, is_blocking=False) + events.append(EventDescription("copy data H -> D", evt)) + + # Make sure compressed array exists and is large enough + compressed_size = d_data.size * 7 + if ("compressed" not in self.cl_mem or + self.cl_mem["compressed"].size < compressed_size): + logger.info("increase compressed buffer size to %s", compressed_size) + self.cl_mem.update({ + "compressed": pyopencl.array.empty(self.queue, + compressed_size, + dtype=numpy.int8)}) + d_compressed = self.cl_mem["compressed"] + d_size = self.cl_mem["counter"] # Shared with decompression + + evt = self.kernels.compression_scan(d_data, d_compressed, d_size) + events.append(EventDescription("compression scan", evt)) + byte_count = int(d_size.get()[0]) + + if out is None: + # Create out array from a sub-region of the compressed buffer + out = pyopencl.array.Array( + self.queue, + shape=(byte_count,), + dtype=numpy.int8, + allocator=functools.partial( + d_compressed.base_data.get_sub_region, + d_compressed.offset)) + + elif out.size < byte_count: + raise ValueError( + "Provided output buffer is not large enough: " + "requires %d bytes, got %d" % (byte_count, out.size)) + + else: # out.size >= byte_count + # Create an array with a sub-region of out and this class queue + out = pyopencl.array.Array( + self.queue, + shape=(byte_count,), + dtype=numpy.int8, + allocator=functools.partial(out.base_data.get_sub_region, + out.offset)) + + evt = pyopencl.enqueue_copy_buffer( + self.queue, d_compressed.data, out.data, byte_count=byte_count) + events.append( + EventDescription("copy D -> D: internal -> out", evt)) + + if self.profile: + self.events += events + + return out + + def encode_to_bytes(self, data): + """Compresses data to CBF and returns compressed data as bytes. + + Usage: + + Provided an image (`image`) stored as a numpy array of int32, + first, create a byte offset compression/decompression object: + + >>> from silx.opencl.codec.byte_offset import ByteOffset + >>> byte_offset_codec = ByteOffset() + + Then, compress an image into bytes: + + >>> compressed = byte_offset_codec.encode_to_bytes(image) + + :param data: The data to compress as a numpy array + (or a pyopencl Array) of int32. + :type data: Union[numpy.ndarray, pyopencl.array.Array] + :return: The compressed data as bytes. + :rtype: bytes + """ + compressed_array = self.encode(data) + return compressed_array.get().tostring() diff --git a/silx/opencl/codec/setup.py b/silx/opencl/codec/setup.py new file mode 100644 index 0000000..4a5c1e5 --- /dev/null +++ b/silx/opencl/codec/setup.py @@ -0,0 +1,43 @@ +# coding: utf-8 +# +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +from __future__ import division + +__contact__ = "jerome.kieffer@esrf.eu" +__license__ = "MIT" +__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" +__authors__ = ["J. Kieffer"] +__date__ = "13/10/2017" + +from numpy.distutils.misc_util import Configuration + + +def configuration(parent_package='', top_path=None): + config = Configuration('codec', parent_package, top_path) + config.add_subpackage('test') + return config + + +if __name__ == "__main__": + from numpy.distutils.core import setup + setup(configuration=configuration) diff --git a/silx/opencl/codec/test/__init__.py b/silx/opencl/codec/test/__init__.py new file mode 100644 index 0000000..ec76dd3 --- /dev/null +++ b/silx/opencl/codec/test/__init__.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# +# Project: silx +# https://github.com/silx-kit/silx +# +# Copyright (C) 2013-2017 European Synchrotron Radiation Facility, Grenoble, France +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +__authors__ = ["J. Kieffer"] +__license__ = "MIT" +__date__ = "13/10/2017" + +import unittest +from . import test_byte_offset + + +def suite(): + testSuite = unittest.TestSuite() + testSuite.addTest(test_byte_offset.suite()) + + return testSuite diff --git a/silx/opencl/codec/test/test_byte_offset.py b/silx/opencl/codec/test/test_byte_offset.py new file mode 100644 index 0000000..2bfa1d3 --- /dev/null +++ b/silx/opencl/codec/test/test_byte_offset.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Project: Byte-offset decompression in OpenCL +# https://github.com/silx-kit/silx +# +# Copyright (C) 2013-2018 European Synchrotron Radiation Facility, +# Grenoble, France +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +""" +Test suite for byte-offset decompression +""" + +from __future__ import division, print_function + +__authors__ = ["Jérôme Kieffer"] +__contact__ = "jerome.kieffer@esrf.eu" +__license__ = "MIT" +__copyright__ = "2013 European Synchrotron Radiation Facility, Grenoble, France" +__date__ = "10/11/2017" + +import sys +import time +import logging +import numpy +from silx.opencl.common import ocl, pyopencl +from silx.opencl.codec import byte_offset +try: + import fabio +except ImportError: + fabio = None +import unittest +logger = logging.getLogger(__name__) + + +@unittest.skipUnless(ocl and fabio and pyopencl, + "PyOpenCl or fabio is missing") +class TestByteOffset(unittest.TestCase): + + @staticmethod + def _create_test_data(shape, nexcept, lam=200): + """Create test (image, compressed stream) pair. + + :param shape: Shape of test image + :param int nexcept: Number of exceptions in the image + :param lam: Expectation of interval argument for numpy.random.poisson + :return: (reference image array, compressed stream) + """ + size = numpy.prod(shape) + ref = numpy.random.poisson(lam, numpy.prod(shape)) + exception_loc = numpy.random.randint(0, size, size=nexcept) + exception_value = numpy.random.randint(0, 1000000, size=nexcept) + ref[exception_loc] = exception_value + ref.shape = shape + + raw = fabio.compression.compByteOffset(ref) + return ref, raw + + def test_decompress(self): + """ + tests the byte offset decompression on GPU + """ + ref, raw = self._create_test_data(shape=(91, 97), nexcept=229) + #ref, raw = self._create_test_data(shape=(7, 9), nexcept=0) + + size = numpy.prod(ref.shape) + + try: + bo = byte_offset.ByteOffset(raw_size=len(raw), dec_size=size, profile=True) + except (RuntimeError, pyopencl.RuntimeError) as err: + logger.warning(err) + if sys.platform == "darwin": + raise unittest.SkipTest("Byte-offset decompression is known to be buggy on MacOS-CPU") + else: + raise err + print(bo.block_size) + + t0 = time.time() + res_cy = fabio.compression.decByteOffset(raw) + t1 = time.time() + res_cl = bo.decode(raw) + t2 = time.time() + delta_cy = abs(ref.ravel() - res_cy).max() + delta_cl = abs(ref.ravel() - res_cl.get()).max() + + logger.debug("Global execution time: fabio %.3fms, OpenCL: %.3fms.", + 1000.0 * (t1 - t0), + 1000.0 * (t2 - t1)) + bo.log_profile() + #print(ref) + #print(res_cl.get()) + self.assertEqual(delta_cy, 0, "Checks fabio works") + self.assertEqual(delta_cl, 0, "Checks opencl works") + + def test_many_decompress(self, ntest=10): + """ + tests the byte offset decompression on GPU, many images to ensure there + is not leaking in memory + """ + shape = (991, 997) + size = numpy.prod(shape) + ref, raw = self._create_test_data(shape=shape, nexcept=0, lam=100) + + try: + bo = byte_offset.ByteOffset(len(raw), size, profile=False) + except (RuntimeError, pyopencl.RuntimeError) as err: + logger.warning(err) + if sys.platform == "darwin": + raise unittest.SkipTest("Byte-offset decompression is known to be buggy on MacOS-CPU") + else: + raise err + t0 = time.time() + res_cy = fabio.compression.decByteOffset(raw) + t1 = time.time() + res_cl = bo(raw) + t2 = time.time() + delta_cy = abs(ref.ravel() - res_cy).max() + delta_cl = abs(ref.ravel() - res_cl.get()).max() + self.assertEqual(delta_cy, 0, "Checks fabio works") + self.assertEqual(delta_cl, 0, "Checks opencl works") + logger.debug("Global execution time: fabio %.3fms, OpenCL: %.3fms.", + 1000.0 * (t1 - t0), + 1000.0 * (t2 - t1)) + + for i in range(ntest): + ref, raw = self._create_test_data(shape=shape, nexcept=2729, lam=200) + + t0 = time.time() + res_cy = fabio.compression.decByteOffset(raw) + t1 = time.time() + res_cl = bo(raw) + t2 = time.time() + delta_cy = abs(ref.ravel() - res_cy).max() + delta_cl = abs(ref.ravel() - res_cl.get()).max() + self.assertEqual(delta_cy, 0, "Checks fabio works #%i" % i) + self.assertEqual(delta_cl, 0, "Checks opencl works #%i" % i) + + logger.debug("Global execution time: fabio %.3fms, OpenCL: %.3fms.", + 1000.0 * (t1 - t0), + 1000.0 * (t2 - t1)) + + def test_encode(self): + """Test byte offset compression""" + ref, raw = self._create_test_data(shape=(2713, 2719), nexcept=2729) + + try: + bo = byte_offset.ByteOffset(len(raw), ref.size, profile=True) + except (RuntimeError, pyopencl.RuntimeError) as err: + logger.warning(err) + raise err + + t0 = time.time() + compressed_array = bo.encode(ref) + t1 = time.time() + + compressed_stream = compressed_array.get().tostring() + self.assertEqual(raw, compressed_stream) + + logger.debug("Global execution time: OpenCL: %.3fms.", + 1000.0 * (t1 - t0)) + bo.log_profile() + + def test_encode_to_array(self): + """Test byte offset compression while providing an out array""" + + ref, raw = self._create_test_data(shape=(2713, 2719), nexcept=2729) + + try: + bo = byte_offset.ByteOffset(profile=True) + except (RuntimeError, pyopencl.RuntimeError) as err: + logger.warning(err) + raise err + # Test with out buffer too small + out = pyopencl.array.empty(bo.queue, (10,), numpy.int8) + with self.assertRaises(ValueError): + bo.encode(ref, out) + + # Test with out buffer too big + out = pyopencl.array.empty(bo.queue, (len(raw) + 10,), numpy.int8) + + compressed_array = bo.encode(ref, out) + + # Get size from returned array + compressed_size = compressed_array.size + self.assertEqual(compressed_size, len(raw)) + + # Get data from out array, read it from bo object queue + out_bo_queue = out.with_queue(bo.queue) + compressed_stream = out_bo_queue.get().tostring()[:compressed_size] + self.assertEqual(raw, compressed_stream) + + def test_encode_to_bytes(self): + """Test byte offset compression to bytes""" + ref, raw = self._create_test_data(shape=(2713, 2719), nexcept=2729) + + try: + bo = byte_offset.ByteOffset(profile=True) + except (RuntimeError, pyopencl.RuntimeError) as err: + logger.warning(err) + raise err + + t0 = time.time() + res_fabio = fabio.compression.compByteOffset(ref) + t1 = time.time() + compressed_stream = bo.encode_to_bytes(ref) + t2 = time.time() + + self.assertEqual(raw, compressed_stream) + + logger.debug("Global execution time: fabio %.3fms, OpenCL: %.3fms.", + 1000.0 * (t1 - t0), + 1000.0 * (t2 - t1)) + bo.log_profile() + + def test_encode_to_bytes_from_array(self): + """Test byte offset compression to bytes from a pyopencl array. + """ + ref, raw = self._create_test_data(shape=(2713, 2719), nexcept=2729) + + try: + bo = byte_offset.ByteOffset(profile=True) + except (RuntimeError, pyopencl.RuntimeError) as err: + logger.warning(err) + raise err + + d_ref = pyopencl.array.to_device( + bo.queue, ref.astype(numpy.int32).ravel()) + + t0 = time.time() + res_fabio = fabio.compression.compByteOffset(ref) + t1 = time.time() + compressed_stream = bo.encode_to_bytes(d_ref) + t2 = time.time() + + self.assertEqual(raw, compressed_stream) + + logger.debug("Global execution time: fabio %.3fms, OpenCL: %.3fms.", + 1000.0 * (t1 - t0), + 1000.0 * (t2 - t1)) + bo.log_profile() + + def test_many_encode(self, ntest=10): + """Test byte offset compression with many image""" + shape = (991, 997) + ref, raw = self._create_test_data(shape=shape, nexcept=0, lam=100) + + try: + bo = byte_offset.ByteOffset(profile=False) + except (RuntimeError, pyopencl.RuntimeError) as err: + logger.warning(err) + raise err + + bo_durations = [] + + t0 = time.time() + res_fabio = fabio.compression.compByteOffset(ref) + t1 = time.time() + compressed_stream = bo.encode_to_bytes(ref) + t2 = time.time() + bo_durations.append(1000.0 * (t2 - t1)) + + self.assertEqual(raw, compressed_stream) + logger.debug("Global execution time: fabio %.3fms, OpenCL: %.3fms.", + 1000.0 * (t1 - t0), + 1000.0 * (t2 - t1)) + + for i in range(ntest): + ref, raw = self._create_test_data(shape=shape, nexcept=2729, lam=200) + + t0 = time.time() + res_fabio = fabio.compression.compByteOffset(ref) + t1 = time.time() + compressed_stream = bo.encode_to_bytes(ref) + t2 = time.time() + bo_durations.append(1000.0 * (t2 - t1)) + + self.assertEqual(raw, compressed_stream) + logger.debug("Global execution time: fabio %.3fms, OpenCL: %.3fms.", + 1000.0 * (t1 - t0), + 1000.0 * (t2 - t1)) + + logger.debug("OpenCL execution time: Mean: %fms, Min: %fms, Max: %fms", + numpy.mean(bo_durations), + numpy.min(bo_durations), + numpy.max(bo_durations)) + + +def suite(): + test_suite = unittest.TestSuite() + test_suite.addTest(TestByteOffset("test_decompress")) + test_suite.addTest(TestByteOffset("test_many_decompress")) + test_suite.addTest(TestByteOffset("test_encode")) + test_suite.addTest(TestByteOffset("test_encode_to_array")) + test_suite.addTest(TestByteOffset("test_encode_to_bytes")) + test_suite.addTest(TestByteOffset("test_encode_to_bytes_from_array")) + test_suite.addTest(TestByteOffset("test_many_encode")) + return test_suite -- cgit v1.2.3