summaryrefslogtreecommitdiff
path: root/src/silx/opencl/codec/byte_offset.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/silx/opencl/codec/byte_offset.py')
-rw-r--r--src/silx/opencl/codec/byte_offset.py439
1 files changed, 439 insertions, 0 deletions
diff --git a/src/silx/opencl/codec/byte_offset.py b/src/silx/opencl/codec/byte_offset.py
new file mode 100644
index 0000000..9a52427
--- /dev/null
+++ b/src/silx/opencl/codec/byte_offset.py
@@ -0,0 +1,439 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Project: Sift implementation in Python + OpenCL
+# https://github.com/silx-kit/silx
+#
+# Copyright (C) 2013-2020 European Synchrotron Radiation Facility, Grenoble, France
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation
+# files (the "Software"), to deal in the Software without
+# restriction, including without limitation the rights to use,
+# copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following
+# conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+"""
+This module provides a class for CBF byte offset compression/decompression.
+"""
+
+from __future__ import division, print_function, with_statement
+
+__authors__ = ["Jérôme Kieffer"]
+__contact__ = "jerome.kieffer@esrf.eu"
+__license__ = "MIT"
+__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
+__date__ = "11/10/2018"
+__status__ = "production"
+
+
+import functools
+import os
+import numpy
+from ..common import ocl, pyopencl
+from ..processing import BufferDescription, EventDescription, OpenclProcessing
+
+import logging
+logger = logging.getLogger(__name__)
+
+if pyopencl:
+ import pyopencl.version
+ if pyopencl.version.VERSION < (2016, 0):
+ from pyopencl.scan import GenericScanKernel, GenericDebugScanKernel
+ else:
+ from pyopencl.algorithm import GenericScanKernel
+ from pyopencl.scan import GenericDebugScanKernel
+else:
+ logger.warning("No PyOpenCL, no byte-offset, please see fabio")
+
+
+class ByteOffset(OpenclProcessing):
+ """Perform the byte offset compression/decompression on the GPU
+
+ See :class:`OpenclProcessing` for optional arguments description.
+
+ :param int raw_size:
+ Size of the raw stream for decompression.
+ It can be (slightly) larger than the array.
+ :param int dec_size:
+ Size of the decompression output array
+ (mandatory for decompression)
+ """
+
+ def __init__(self, raw_size=None, dec_size=None,
+ ctx=None, devicetype="all",
+ platformid=None, deviceid=None,
+ block_size=None, profile=False):
+ OpenclProcessing.__init__(self, ctx=ctx, devicetype=devicetype,
+ platformid=platformid, deviceid=deviceid,
+ block_size=block_size, profile=profile)
+ if self.block_size is None:
+ self.block_size = self.device.max_work_group_size
+ wg = self.block_size
+
+ buffers = [BufferDescription("counter", 1, numpy.int32, None)]
+
+ if raw_size is None:
+ self.raw_size = -1
+ self.padded_raw_size = -1
+ else:
+ self.raw_size = int(raw_size)
+ self.padded_raw_size = int((self.raw_size + wg - 1) & ~(wg - 1))
+ buffers += [
+ BufferDescription("raw", self.padded_raw_size, numpy.int8, None),
+ BufferDescription("mask", self.padded_raw_size, numpy.int32, None),
+ BufferDescription("values", self.padded_raw_size, numpy.int32, None),
+ BufferDescription("exceptions", self.padded_raw_size, numpy.int32, None)
+ ]
+
+ if dec_size is None:
+ self.dec_size = None
+ else:
+ self.dec_size = numpy.int32(dec_size)
+ buffers += [
+ BufferDescription("data_float", self.dec_size, numpy.float32, None),
+ BufferDescription("data_int", self.dec_size, numpy.int32, None)
+ ]
+
+ self.allocate_buffers(buffers, use_array=True)
+
+ self.compile_kernels([os.path.join("codec", "byte_offset")])
+ self.kernels.__setattr__("scan", self._init_double_scan())
+ self.kernels.__setattr__("compression_scan",
+ self._init_compression_scan())
+
+ def _init_double_scan(self):
+ """"generates a double scan on indexes and values in one operation"""
+ arguments = "__global int *value", "__global int *index"
+ int2 = pyopencl.tools.get_or_register_dtype("int2")
+ input_expr = "index[i]>0 ? (int2)(0, 0) : (int2)(value[i], 1)"
+ scan_expr = "a+b"
+ neutral = "(int2)(0,0)"
+ output_statement = "value[i] = item.s0; index[i+1] = item.s1;"
+
+ if self.block_size > 256:
+ knl = GenericScanKernel(self.ctx,
+ dtype=int2,
+ arguments=arguments,
+ input_expr=input_expr,
+ scan_expr=scan_expr,
+ neutral=neutral,
+ output_statement=output_statement)
+ else: # MacOS on CPU
+ knl = GenericDebugScanKernel(self.ctx,
+ dtype=int2,
+ arguments=arguments,
+ input_expr=input_expr,
+ scan_expr=scan_expr,
+ neutral=neutral,
+ output_statement=output_statement)
+ return knl
+
+ def decode(self, raw, as_float=False, out=None):
+ """This function actually performs the decompression by calling the kernels
+
+ :param numpy.ndarray raw: The compressed data as a 1D numpy array of char.
+ :param bool as_float: True to decompress as float32,
+ False (default) to decompress as int32
+ :param pyopencl.array out: pyopencl array in which to place the result.
+ :return: The decompressed image as an pyopencl array.
+ :rtype: pyopencl.array
+ """
+ assert self.dec_size is not None, \
+ "dec_size is a mandatory ByteOffset init argument for decompression"
+
+ events = []
+ with self.sem:
+ len_raw = numpy.int32(len(raw))
+ if len_raw > self.padded_raw_size:
+ wg = self.block_size
+ self.raw_size = int(len(raw))
+ self.padded_raw_size = (self.raw_size + wg - 1) & ~(wg - 1)
+ logger.info("increase raw buffer size to %s", self.padded_raw_size)
+ buffers = {
+ "raw": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int8),
+ "mask": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32),
+ "exceptions": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32),
+ "values": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32),
+ }
+ self.cl_mem.update(buffers)
+ else:
+ wg = self.block_size
+
+ evt = pyopencl.enqueue_copy(self.queue, self.cl_mem["raw"].data,
+ raw,
+ is_blocking=False)
+ events.append(EventDescription("copy raw H -> D", evt))
+ evt = self.kernels.fill_int_mem(self.queue, (self.padded_raw_size,), (wg,),
+ self.cl_mem["mask"].data,
+ numpy.int32(self.padded_raw_size),
+ numpy.int32(0),
+ numpy.int32(0))
+ events.append(EventDescription("memset mask", evt))
+ evt = self.kernels.fill_int_mem(self.queue, (1,), (1,),
+ self.cl_mem["counter"].data,
+ numpy.int32(1),
+ numpy.int32(0),
+ numpy.int32(0))
+ events.append(EventDescription("memset counter", evt))
+ evt = self.kernels.mark_exceptions(self.queue, (self.padded_raw_size,), (wg,),
+ self.cl_mem["raw"].data,
+ len_raw,
+ numpy.int32(self.raw_size),
+ self.cl_mem["mask"].data,
+ self.cl_mem["values"].data,
+ self.cl_mem["counter"].data,
+ self.cl_mem["exceptions"].data)
+ events.append(EventDescription("mark exceptions", evt))
+ nb_exceptions = numpy.empty(1, dtype=numpy.int32)
+ evt = pyopencl.enqueue_copy(self.queue, nb_exceptions, self.cl_mem["counter"].data,
+ is_blocking=False)
+ events.append(EventDescription("copy counter D -> H", evt))
+ evt.wait()
+ nbexc = int(nb_exceptions[0])
+ if nbexc == 0:
+ logger.info("nbexc %i", nbexc)
+ else:
+ evt = self.kernels.treat_exceptions(self.queue, (nbexc,), (1,),
+ self.cl_mem["raw"].data,
+ len_raw,
+ self.cl_mem["mask"].data,
+ self.cl_mem["exceptions"].data,
+ self.cl_mem["values"].data
+ )
+ events.append(EventDescription("treat_exceptions", evt))
+
+ #self.cl_mem["copy_values"] = self.cl_mem["values"].copy()
+ #self.cl_mem["copy_mask"] = self.cl_mem["mask"].copy()
+ evt = self.kernels.scan(self.cl_mem["values"],
+ self.cl_mem["mask"],
+ queue=self.queue,
+ size=int(len_raw),
+ wait_for=(evt,))
+ events.append(EventDescription("double scan", evt))
+ #evt.wait()
+ if out is not None:
+ if out.dtype == numpy.float32:
+ copy_results = self.kernels.copy_result_float
+ else:
+ copy_results = self.kernels.copy_result_int
+ else:
+ if as_float:
+ out = self.cl_mem["data_float"]
+ copy_results = self.kernels.copy_result_float
+ else:
+ out = self.cl_mem["data_int"]
+ copy_results = self.kernels.copy_result_int
+ evt = copy_results(self.queue, (self.padded_raw_size,), (wg,),
+ self.cl_mem["values"].data,
+ self.cl_mem["mask"].data,
+ len_raw,
+ self.dec_size,
+ out.data
+ )
+ events.append(EventDescription("copy_results", evt))
+ #evt.wait()
+ if self.profile:
+ self.events += events
+ return out
+
+ __call__ = decode
+
+ def _init_compression_scan(self):
+ """Initialize CBF compression scan kernels"""
+ preamble = """
+ int compressed_size(int diff) {
+ int abs_diff = abs(diff);
+
+ if (abs_diff < 128) {
+ return 1;
+ }
+ else if (abs_diff < 32768) {
+ return 3;
+ }
+ else {
+ return 7;
+ }
+ }
+
+ void write(const int index,
+ const int diff,
+ global char *output) {
+ int abs_diff = abs(diff);
+
+ if (abs_diff < 128) {
+ output[index] = (char) diff;
+ }
+ else if (abs_diff < 32768) {
+ output[index] = -128;
+ output[index + 1] = (char) (diff >> 0);
+ output[index + 2] = (char) (diff >> 8);
+ }
+ else {
+ output[index] = -128;
+ output[index + 1] = 0;
+ output[index + 2] = -128;
+ output[index + 3] = (char) (diff >> 0);
+ output[index + 4] = (char) (diff >> 8);
+ output[index + 5] = (char) (diff >> 16);
+ output[index + 6] = (char) (diff >> 24);
+ }
+ }
+ """
+ arguments = "__global const int *data, __global char *compressed, __global int *size"
+ input_expr = "compressed_size((i == 0) ? data[0] : (data[i] - data[i - 1]))"
+ scan_expr = "a+b"
+ neutral = "0"
+ output_statement = """
+ if (prev_item == 0) { // 1st thread store compressed data size
+ size[0] = last_item;
+ }
+ write(prev_item, (i == 0) ? data[0] : (data[i] - data[i - 1]), compressed);
+ """
+
+ if self.block_size >= 64:
+ knl = GenericScanKernel(self.ctx,
+ dtype=numpy.int32,
+ preamble=preamble,
+ arguments=arguments,
+ input_expr=input_expr,
+ scan_expr=scan_expr,
+ neutral=neutral,
+ output_statement=output_statement)
+ else: # MacOS on CPU
+ knl = GenericDebugScanKernel(self.ctx,
+ dtype=numpy.int32,
+ preamble=preamble,
+ arguments=arguments,
+ input_expr=input_expr,
+ scan_expr=scan_expr,
+ neutral=neutral,
+ output_statement=output_statement)
+ return knl
+
+ def encode(self, data, out=None):
+ """Compress data to CBF.
+
+ :param data: The data to compress as a numpy array
+ (or a pyopencl Array) of int32.
+ :type data: Union[numpy.ndarray, pyopencl.array.Array]
+ :param pyopencl.array out:
+ pyopencl array of int8 in which to store the result.
+ The array should be large enough to store the compressed data.
+ :return: The compressed data as a pyopencl array.
+ If out is provided, this array shares the backing buffer,
+ but has the exact size of the compressed data and the queue
+ of the ByteOffset instance.
+ :rtype: pyopencl.array
+ :raises ValueError: if out array is not large enough
+ """
+
+ events = []
+ with self.sem:
+ if isinstance(data, pyopencl.array.Array):
+ d_data = data # Uses provided array
+
+ else: # Copy data to device
+ data = numpy.ascontiguousarray(data, dtype=numpy.int32).ravel()
+
+ # Make sure data array exists and is large enough
+ if ("data_input" not in self.cl_mem or
+ self.cl_mem["data_input"].size < data.size):
+ logger.info("increase data input buffer size to %s", data.size)
+ self.cl_mem.update({
+ "data_input": pyopencl.array.empty(self.queue,
+ data.size,
+ dtype=numpy.int32)})
+ d_data = self.cl_mem["data_input"]
+
+ evt = pyopencl.enqueue_copy(
+ self.queue, d_data.data, data, is_blocking=False)
+ events.append(EventDescription("copy data H -> D", evt))
+
+ # Make sure compressed array exists and is large enough
+ compressed_size = d_data.size * 7
+ if ("compressed" not in self.cl_mem or
+ self.cl_mem["compressed"].size < compressed_size):
+ logger.info("increase compressed buffer size to %s", compressed_size)
+ self.cl_mem.update({
+ "compressed": pyopencl.array.empty(self.queue,
+ compressed_size,
+ dtype=numpy.int8)})
+ d_compressed = self.cl_mem["compressed"]
+ d_size = self.cl_mem["counter"] # Shared with decompression
+
+ evt = self.kernels.compression_scan(d_data, d_compressed, d_size)
+ events.append(EventDescription("compression scan", evt))
+ byte_count = int(d_size.get()[0])
+
+ if out is None:
+ # Create out array from a sub-region of the compressed buffer
+ out = pyopencl.array.Array(
+ self.queue,
+ shape=(byte_count,),
+ dtype=numpy.int8,
+ allocator=functools.partial(
+ d_compressed.base_data.get_sub_region,
+ d_compressed.offset))
+
+ elif out.size < byte_count:
+ raise ValueError(
+ "Provided output buffer is not large enough: "
+ "requires %d bytes, got %d" % (byte_count, out.size))
+
+ else: # out.size >= byte_count
+ # Create an array with a sub-region of out and this class queue
+ out = pyopencl.array.Array(
+ self.queue,
+ shape=(byte_count,),
+ dtype=numpy.int8,
+ allocator=functools.partial(out.base_data.get_sub_region,
+ out.offset))
+
+ evt = pyopencl.enqueue_copy(self.queue, out.data, d_compressed.data,
+ byte_count=byte_count)
+ events.append(
+ EventDescription("copy D -> D: internal -> out", evt))
+
+ if self.profile:
+ self.events += events
+
+ return out
+
+ def encode_to_bytes(self, data):
+ """Compresses data to CBF and returns compressed data as bytes.
+
+ Usage:
+
+ Provided an image (`image`) stored as a numpy array of int32,
+ first, create a byte offset compression/decompression object:
+
+ >>> from silx.opencl.codec.byte_offset import ByteOffset
+ >>> byte_offset_codec = ByteOffset()
+
+ Then, compress an image into bytes:
+
+ >>> compressed = byte_offset_codec.encode_to_bytes(image)
+
+ :param data: The data to compress as a numpy array
+ (or a pyopencl Array) of int32.
+ :type data: Union[numpy.ndarray, pyopencl.array.Array]
+ :return: The compressed data as bytes.
+ :rtype: bytes
+ """
+ compressed_array = self.encode(data)
+ return compressed_array.get().tobytes()