diff options
Diffstat (limited to 'src/silx/opencl/codec/byte_offset.py')
-rw-r--r-- | src/silx/opencl/codec/byte_offset.py | 332 |
1 files changed, 203 insertions, 129 deletions
diff --git a/src/silx/opencl/codec/byte_offset.py b/src/silx/opencl/codec/byte_offset.py index e497a73..e3df9b2 100644 --- a/src/silx/opencl/codec/byte_offset.py +++ b/src/silx/opencl/codec/byte_offset.py @@ -3,7 +3,7 @@ # Project: Sift implementation in Python + OpenCL # https://github.com/silx-kit/silx # -# Copyright (C) 2013-2020 European Synchrotron Radiation Facility, Grenoble, France +# Copyright (C) 2013-2023 European Synchrotron Radiation Facility, Grenoble, France # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -45,10 +45,12 @@ from ..common import ocl, pyopencl from ..processing import BufferDescription, EventDescription, OpenclProcessing import logging + logger = logging.getLogger(__name__) if pyopencl: import pyopencl.version + if pyopencl.version.VERSION < (2016, 0): from pyopencl.scan import GenericScanKernel, GenericDebugScanKernel else: @@ -61,23 +63,36 @@ else: class ByteOffset(OpenclProcessing): """Perform the byte offset compression/decompression on the GPU - See :class:`OpenclProcessing` for optional arguments description. - - :param int raw_size: - Size of the raw stream for decompression. - It can be (slightly) larger than the array. - :param int dec_size: - Size of the decompression output array - (mandatory for decompression) - """ - - def __init__(self, raw_size=None, dec_size=None, - ctx=None, devicetype="all", - platformid=None, deviceid=None, - block_size=None, profile=False): - OpenclProcessing.__init__(self, ctx=ctx, devicetype=devicetype, - platformid=platformid, deviceid=deviceid, - block_size=block_size, profile=profile) + See :class:`OpenclProcessing` for optional arguments description. + + :param int raw_size: + Size of the raw stream for decompression. + It can be (slightly) larger than the array. + :param int dec_size: + Size of the decompression output array + (mandatory for decompression) + """ + + def __init__( + self, + raw_size=None, + dec_size=None, + ctx=None, + devicetype="all", + platformid=None, + deviceid=None, + block_size=None, + profile=False, + ): + OpenclProcessing.__init__( + self, + ctx=ctx, + devicetype=devicetype, + platformid=platformid, + deviceid=deviceid, + block_size=block_size, + profile=profile, + ) if self.block_size is None: self.block_size = self.device.max_work_group_size wg = self.block_size @@ -94,7 +109,9 @@ class ByteOffset(OpenclProcessing): BufferDescription("raw", self.padded_raw_size, numpy.int8, None), BufferDescription("mask", self.padded_raw_size, numpy.int32, None), BufferDescription("values", self.padded_raw_size, numpy.int32, None), - BufferDescription("exceptions", self.padded_raw_size, numpy.int32, None) + BufferDescription( + "exceptions", self.padded_raw_size, numpy.int32, None + ), ] if dec_size is None: @@ -103,18 +120,17 @@ class ByteOffset(OpenclProcessing): self.dec_size = numpy.int32(dec_size) buffers += [ BufferDescription("data_float", self.dec_size, numpy.float32, None), - BufferDescription("data_int", self.dec_size, numpy.int32, None) + BufferDescription("data_int", self.dec_size, numpy.int32, None), ] self.allocate_buffers(buffers, use_array=True) self.compile_kernels([os.path.join("codec", "byte_offset")]) self.kernels.__setattr__("scan", self._init_double_scan()) - self.kernels.__setattr__("compression_scan", - self._init_compression_scan()) + self.kernels.__setattr__("compression_scan", self._init_compression_scan()) def _init_double_scan(self): - """"generates a double scan on indexes and values in one operation""" + """generates a double scan on indexes and values in one operation""" arguments = "__global int *value", "__global int *index" int2 = pyopencl.tools.get_or_register_dtype("int2") input_expr = "index[i]>0 ? (int2)(0, 0) : (int2)(value[i], 1)" @@ -123,21 +139,25 @@ class ByteOffset(OpenclProcessing): output_statement = "value[i] = item.s0; index[i+1] = item.s1;" if self.block_size > 256: - knl = GenericScanKernel(self.ctx, - dtype=int2, - arguments=arguments, - input_expr=input_expr, - scan_expr=scan_expr, - neutral=neutral, - output_statement=output_statement) + knl = GenericScanKernel( + self.ctx, + dtype=int2, + arguments=arguments, + input_expr=input_expr, + scan_expr=scan_expr, + neutral=neutral, + output_statement=output_statement, + ) else: # MacOS on CPU - knl = GenericDebugScanKernel(self.ctx, - dtype=int2, - arguments=arguments, - input_expr=input_expr, - scan_expr=scan_expr, - neutral=neutral, - output_statement=output_statement) + knl = GenericDebugScanKernel( + self.ctx, + dtype=int2, + arguments=arguments, + input_expr=input_expr, + scan_expr=scan_expr, + neutral=neutral, + output_statement=output_statement, + ) return knl def decode(self, raw, as_float=False, out=None): @@ -150,8 +170,9 @@ class ByteOffset(OpenclProcessing): :return: The decompressed image as an pyopencl array. :rtype: pyopencl.array """ - assert self.dec_size is not None, \ - "dec_size is a mandatory ByteOffset init argument for decompression" + assert ( + self.dec_size is not None + ), "dec_size is a mandatory ByteOffset init argument for decompression" events = [] with self.sem: @@ -162,67 +183,96 @@ class ByteOffset(OpenclProcessing): self.padded_raw_size = (self.raw_size + wg - 1) & ~(wg - 1) logger.info("increase raw buffer size to %s", self.padded_raw_size) buffers = { - "raw": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int8), - "mask": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32), - "exceptions": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32), - "values": pyopencl.array.empty(self.queue, self.padded_raw_size, dtype=numpy.int32), - } + "raw": pyopencl.array.empty( + self.queue, self.padded_raw_size, dtype=numpy.int8 + ), + "mask": pyopencl.array.empty( + self.queue, self.padded_raw_size, dtype=numpy.int32 + ), + "exceptions": pyopencl.array.empty( + self.queue, self.padded_raw_size, dtype=numpy.int32 + ), + "values": pyopencl.array.empty( + self.queue, self.padded_raw_size, dtype=numpy.int32 + ), + } self.cl_mem.update(buffers) else: wg = self.block_size - evt = pyopencl.enqueue_copy(self.queue, self.cl_mem["raw"].data, - raw, - is_blocking=False) + evt = pyopencl.enqueue_copy( + self.queue, self.cl_mem["raw"].data, raw, is_blocking=False + ) events.append(EventDescription("copy raw H -> D", evt)) - evt = self.kernels.fill_int_mem(self.queue, (self.padded_raw_size,), (wg,), - self.cl_mem["mask"].data, - numpy.int32(self.padded_raw_size), - numpy.int32(0), - numpy.int32(0)) + evt = self.kernels.fill_int_mem( + self.queue, + (self.padded_raw_size,), + (wg,), + self.cl_mem["mask"].data, + numpy.int32(self.padded_raw_size), + numpy.int32(0), + numpy.int32(0), + ) events.append(EventDescription("memset mask", evt)) - evt = self.kernels.fill_int_mem(self.queue, (1,), (1,), - self.cl_mem["counter"].data, - numpy.int32(1), - numpy.int32(0), - numpy.int32(0)) + evt = self.kernels.fill_int_mem( + self.queue, + (1,), + (1,), + self.cl_mem["counter"].data, + numpy.int32(1), + numpy.int32(0), + numpy.int32(0), + ) events.append(EventDescription("memset counter", evt)) - evt = self.kernels.mark_exceptions(self.queue, (self.padded_raw_size,), (wg,), - self.cl_mem["raw"].data, - len_raw, - numpy.int32(self.raw_size), - self.cl_mem["mask"].data, - self.cl_mem["values"].data, - self.cl_mem["counter"].data, - self.cl_mem["exceptions"].data) + evt = self.kernels.mark_exceptions( + self.queue, + (self.padded_raw_size,), + (wg,), + self.cl_mem["raw"].data, + len_raw, + numpy.int32(self.raw_size), + self.cl_mem["mask"].data, + self.cl_mem["values"].data, + self.cl_mem["counter"].data, + self.cl_mem["exceptions"].data, + ) events.append(EventDescription("mark exceptions", evt)) nb_exceptions = numpy.empty(1, dtype=numpy.int32) - evt = pyopencl.enqueue_copy(self.queue, nb_exceptions, self.cl_mem["counter"].data, - is_blocking=False) + evt = pyopencl.enqueue_copy( + self.queue, + nb_exceptions, + self.cl_mem["counter"].data, + is_blocking=False, + ) events.append(EventDescription("copy counter D -> H", evt)) evt.wait() nbexc = int(nb_exceptions[0]) if nbexc == 0: logger.info("nbexc %i", nbexc) else: - evt = self.kernels.treat_exceptions(self.queue, (nbexc,), (1,), - self.cl_mem["raw"].data, - len_raw, - self.cl_mem["mask"].data, - self.cl_mem["exceptions"].data, - self.cl_mem["values"].data - ) + evt = self.kernels.treat_exceptions( + self.queue, + (nbexc,), + (1,), + self.cl_mem["raw"].data, + len_raw, + self.cl_mem["mask"].data, + self.cl_mem["exceptions"].data, + self.cl_mem["values"].data, + ) events.append(EventDescription("treat_exceptions", evt)) - #self.cl_mem["copy_values"] = self.cl_mem["values"].copy() - #self.cl_mem["copy_mask"] = self.cl_mem["mask"].copy() - evt = self.kernels.scan(self.cl_mem["values"], - self.cl_mem["mask"], - queue=self.queue, - size=int(len_raw), - wait_for=(evt,)) + # self.cl_mem["copy_values"] = self.cl_mem["values"].copy() + # self.cl_mem["copy_mask"] = self.cl_mem["mask"].copy() + evt = self.kernels.scan( + self.cl_mem["values"], + self.cl_mem["mask"], + queue=self.queue, + size=int(len_raw), + wait_for=(evt,), + ) events.append(EventDescription("double scan", evt)) - #evt.wait() + # evt.wait() if out is not None: if out.dtype == numpy.float32: copy_results = self.kernels.copy_result_float @@ -235,15 +285,18 @@ class ByteOffset(OpenclProcessing): else: out = self.cl_mem["data_int"] copy_results = self.kernels.copy_result_int - evt = copy_results(self.queue, (self.padded_raw_size,), (wg,), - self.cl_mem["values"].data, - self.cl_mem["mask"].data, - len_raw, - self.dec_size, - out.data - ) + evt = copy_results( + self.queue, + (self.padded_raw_size,), + (wg,), + self.cl_mem["values"].data, + self.cl_mem["mask"].data, + len_raw, + self.dec_size, + out.data, + ) events.append(EventDescription("copy_results", evt)) - #evt.wait() + # evt.wait() if self.profile: self.events += events return out @@ -291,7 +344,9 @@ class ByteOffset(OpenclProcessing): } } """ - arguments = "__global const int *data, __global char *compressed, __global int *size" + arguments = ( + "__global const int *data, __global char *compressed, __global int *size" + ) input_expr = "compressed_size((i == 0) ? data[0] : (data[i] - data[i - 1]))" scan_expr = "a+b" neutral = "0" @@ -303,23 +358,27 @@ class ByteOffset(OpenclProcessing): """ if self.block_size >= 64: - knl = GenericScanKernel(self.ctx, - dtype=numpy.int32, - preamble=preamble, - arguments=arguments, - input_expr=input_expr, - scan_expr=scan_expr, - neutral=neutral, - output_statement=output_statement) + knl = GenericScanKernel( + self.ctx, + dtype=numpy.int32, + preamble=preamble, + arguments=arguments, + input_expr=input_expr, + scan_expr=scan_expr, + neutral=neutral, + output_statement=output_statement, + ) else: # MacOS on CPU - knl = GenericDebugScanKernel(self.ctx, - dtype=numpy.int32, - preamble=preamble, - arguments=arguments, - input_expr=input_expr, - scan_expr=scan_expr, - neutral=neutral, - output_statement=output_statement) + knl = GenericDebugScanKernel( + self.ctx, + dtype=numpy.int32, + preamble=preamble, + arguments=arguments, + input_expr=input_expr, + scan_expr=scan_expr, + neutral=neutral, + output_statement=output_statement, + ) return knl def encode(self, data, out=None): @@ -348,28 +407,39 @@ class ByteOffset(OpenclProcessing): data = numpy.ascontiguousarray(data, dtype=numpy.int32).ravel() # Make sure data array exists and is large enough - if ("data_input" not in self.cl_mem or - self.cl_mem["data_input"].size < data.size): + if ( + "data_input" not in self.cl_mem + or self.cl_mem["data_input"].size < data.size + ): logger.info("increase data input buffer size to %s", data.size) - self.cl_mem.update({ - "data_input": pyopencl.array.empty(self.queue, - data.size, - dtype=numpy.int32)}) + self.cl_mem.update( + { + "data_input": pyopencl.array.empty( + self.queue, data.size, dtype=numpy.int32 + ) + } + ) d_data = self.cl_mem["data_input"] evt = pyopencl.enqueue_copy( - self.queue, d_data.data, data, is_blocking=False) + self.queue, d_data.data, data, is_blocking=False + ) events.append(EventDescription("copy data H -> D", evt)) # Make sure compressed array exists and is large enough compressed_size = d_data.size * 7 - if ("compressed" not in self.cl_mem or - self.cl_mem["compressed"].size < compressed_size): + if ( + "compressed" not in self.cl_mem + or self.cl_mem["compressed"].size < compressed_size + ): logger.info("increase compressed buffer size to %s", compressed_size) - self.cl_mem.update({ - "compressed": pyopencl.array.empty(self.queue, - compressed_size, - dtype=numpy.int8)}) + self.cl_mem.update( + { + "compressed": pyopencl.array.empty( + self.queue, compressed_size, dtype=numpy.int8 + ) + } + ) d_compressed = self.cl_mem["compressed"] d_size = self.cl_mem["counter"] # Shared with decompression @@ -384,13 +454,15 @@ class ByteOffset(OpenclProcessing): shape=(byte_count,), dtype=numpy.int8, allocator=functools.partial( - d_compressed.base_data.get_sub_region, - d_compressed.offset)) + d_compressed.base_data.get_sub_region, d_compressed.offset + ), + ) elif out.size < byte_count: raise ValueError( "Provided output buffer is not large enough: " - "requires %d bytes, got %d" % (byte_count, out.size)) + "requires %d bytes, got %d" % (byte_count, out.size) + ) else: # out.size >= byte_count # Create an array with a sub-region of out and this class queue @@ -398,13 +470,15 @@ class ByteOffset(OpenclProcessing): self.queue, shape=(byte_count,), dtype=numpy.int8, - allocator=functools.partial(out.base_data.get_sub_region, - out.offset)) + allocator=functools.partial( + out.base_data.get_sub_region, out.offset + ), + ) - evt = pyopencl.enqueue_copy(self.queue, out.data, d_compressed.data, - byte_count=byte_count) - events.append( - EventDescription("copy D -> D: internal -> out", evt)) + evt = pyopencl.enqueue_copy( + self.queue, out.data, d_compressed.data, byte_count=byte_count + ) + events.append(EventDescription("copy D -> D: internal -> out", evt)) if self.profile: self.events += events |