From bfa4dba15485b4192f8bbe13345e9658c97ecf76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Picca=20Fr=C3=A9d=C3=A9ric-Emmanuel?= Date: Sat, 7 Oct 2017 07:59:01 +0200 Subject: New upstream version 0.6.0+dfsg --- silx/io/specfile.pyx | 1268 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1268 insertions(+) create mode 100644 silx/io/specfile.pyx (limited to 'silx/io/specfile.pyx') diff --git a/silx/io/specfile.pyx b/silx/io/specfile.pyx new file mode 100644 index 0000000..35f425b --- /dev/null +++ b/silx/io/specfile.pyx @@ -0,0 +1,1268 @@ +# coding: utf-8 +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +""" +This module is a cython binding to wrap the C SpecFile library, to access +SpecFile data within a python program. + +Documentation for the original C library SpecFile can be found on the ESRF +website: +`The manual for the SpecFile Library `_ + +Examples +======== + +Start by importing :class:`SpecFile` and instantiate it: + +.. code-block:: python + + from silx.io.specfile import SpecFile + + sf = SpecFile("test.dat") + +A :class:`SpecFile` instance can be accessed like a dictionary to obtain a +:class:`Scan` instance. + +If the key is a string representing two values +separated by a dot (e.g. ``"1.2"``), they will be treated as the scan number +(``#S`` header line) and the scan order:: + + # get second occurrence of scan "#S 1" + myscan = sf["1.2"] + + # access scan data as a numpy array + nlines, ncolumns = myscan.data.shape + +If the key is an integer, it will be treated as a 0-based index:: + + first_scan = sf[0] + second_scan = sf[1] + +It is also possible to browse through all scans using :class:`SpecFile` as +an iterator:: + + for scan in sf: + print(scan.scan_header_dict['S']) + +MCA spectra can be selectively loaded using an instance of :class:`MCA` +provided by :class:`Scan`:: + + # Only one MCA spectrum is loaded in memory + second_mca = first_scan.mca[1] + + # Iterating trough all MCA spectra in a scan: + for mca_data in first_scan.mca: + print(sum(mca_data)) + +Classes +======= + +- :class:`SpecFile` +- :class:`Scan` +- :class:`MCA` + +Exceptions +========== + +- :class:`SfError` +- :class:`SfErrMemoryAlloc` +- :class:`SfErrFileOpen` +- :class:`SfErrFileClose` +- :class:`SfErrFileRead` +- :class:`SfErrFileWrite` +- :class:`SfErrLineNotFound` +- :class:`SfErrScanNotFound` +- :class:`SfErrHeaderNotFound` +- :class:`SfErrLabelNotFound` +- :class:`SfErrMotorNotFound` +- :class:`SfErrPositionNotFound` +- :class:`SfErrLineEmpty` +- :class:`SfErrUserNotFound` +- :class:`SfErrColNotFound` +- :class:`SfErrMcaNotFound` + +""" + +__authors__ = ["P. Knobel"] +__license__ = "MIT" +__date__ = "11/08/2017" + +import os.path +import logging +import numpy +import re +import sys + +_logger = logging.getLogger(__name__) + +cimport numpy +cimport cython +from libc.stdlib cimport free + +cimport specfile_wrapper + +# hack to avoid C compiler warnings about unused functions in the NumPy header files +# Sources: Cython test suite. +cdef extern from *: + bint FALSE "0" + void import_array() + void import_umath() + +if FALSE: + import_array() + import_umath() + +numpy.import_array() + + +SF_ERR_NO_ERRORS = 0 +SF_ERR_FILE_OPEN = 2 +SF_ERR_SCAN_NOT_FOUND = 7 + + +# custom errors +class SfError(Exception): + """Base exception inherited by all exceptions raised when a + C function from the legacy SpecFile library returns an error + code. + """ + pass + +class SfErrMemoryAlloc(SfError, MemoryError): pass +class SfErrFileOpen(SfError, IOError): pass +class SfErrFileClose(SfError, IOError): pass +class SfErrFileRead(SfError, IOError): pass +class SfErrFileWrite(SfError, IOError): pass +class SfErrLineNotFound(SfError, KeyError): pass +class SfErrScanNotFound(SfError, IndexError): pass +class SfErrHeaderNotFound(SfError, KeyError): pass +class SfErrLabelNotFound(SfError, KeyError): pass +class SfErrMotorNotFound(SfError, KeyError): pass +class SfErrPositionNotFound(SfError, KeyError): pass +class SfErrLineEmpty(SfError, IOError): pass +class SfErrUserNotFound(SfError, KeyError): pass +class SfErrColNotFound(SfError, KeyError): pass +class SfErrMcaNotFound(SfError, IndexError): pass + + +ERRORS = { + 1: SfErrMemoryAlloc, + 2: SfErrFileOpen, + 3: SfErrFileClose, + 4: SfErrFileRead, + 5: SfErrFileWrite, + 6: SfErrLineNotFound, + 7: SfErrScanNotFound, + 8: SfErrHeaderNotFound, + 9: SfErrLabelNotFound, + 10: SfErrMotorNotFound, + 11: SfErrPositionNotFound, + 12: SfErrLineEmpty, + 13: SfErrUserNotFound, + 14: SfErrColNotFound, + 15: SfErrMcaNotFound, +} + + +class SfNoMcaError(SfError): + """Custom exception raised when ``SfNoMca()`` returns ``-1`` + """ + pass + + +class MCA(object): + """ + + :param scan: Parent Scan instance + :type scan: :class:`Scan` + + :var calibration: MCA calibration :math:`(a, b, c)` (as in + :math:`a + b x + c x²`) from ``#@CALIB`` scan header. + :type calibration: list of 3 floats, default ``[0., 1., 0.]`` + :var channels: MCA channels list from ``#@CHANN`` scan header. + In the absence of a ``#@CHANN`` header, this attribute is a list + ``[0, …, N-1]`` where ``N`` is the length of the first spectrum. + In the absence of MCA spectra, this attribute defaults to ``None``. + :type channels: list of int + + This class provides access to Multi-Channel Analysis data. A :class:`MCA` + instance can be indexed to access 1D numpy arrays representing single + MCA spectra. + + To create a :class:`MCA` instance, you must provide a parent :class:`Scan` + instance, which in turn will provide a reference to the original + :class:`SpecFile` instance:: + + sf = SpecFile("/path/to/specfile.dat") + scan2 = Scan(sf, scan_index=2) + mcas_in_scan2 = MCA(scan2) + for i in len(mcas_in_scan2): + mca_data = mcas_in_scan2[i] + ... # do some something with mca_data (1D numpy array) + + A more pythonic way to do the same work, without having to explicitly + instantiate ``scan`` and ``mcas_in_scan``, would be:: + + sf = SpecFile("specfilename.dat") + # scan2 from previous example can be referred to as sf[2] + # mcas_in_scan2 from previous example can be referred to as scan2.mca + for mca_data in sf[2].mca: + ... # do some something with mca_data (1D numpy array) + + """ + def __init__(self, scan): + self._scan = scan + + # Header dict + self._header = scan.mca_header_dict + + self.calibration = [] + """List of lists of calibration values, + one list of 3 floats per MCA device or a single list applying to + all devices """ + self._parse_calibration() + + self.channels = [] + """List of lists of channels, + one list of integers per MCA device or a single list applying to + all devices""" + self._parse_channels() + + def _parse_channels(self): + """Fill :attr:`channels`""" + # Channels list + if "CHANN" in self._header: + chann_lines = self._header["CHANN"].split("\n") + all_chann_values = [chann_line.split() for chann_line in chann_lines] + for one_line_chann_values in all_chann_values: + length, start, stop, increment = map(int, one_line_chann_values) + self.channels.append(list(range(start, stop + 1, increment))) + elif len(self): + # in the absence of #@CHANN, use shape of first MCA + length = self[0].shape[0] + start, stop, increment = (0, length - 1, 1) + self.channels.append(list(range(start, stop + 1, increment))) + + def _parse_calibration(self): + """Fill :attr:`calibration`""" + # Channels list + if "CALIB" in self._header: + calib_lines = self._header["CALIB"].split("\n") + all_calib_values = [calib_line.split() for calib_line in calib_lines] + for one_line_calib_values in all_calib_values: + self.calibration.append(list(map(float, one_line_calib_values))) + else: + # in the absence of #@calib, use default + self.calibration.append([0., 1., 0.]) + + def __len__(self): + """ + + :return: Number of mca in Scan + :rtype: int + """ + return self._scan._specfile.number_of_mca(self._scan.index) + + def __getitem__(self, key): + """Return a single MCA data line + + :param key: 0-based index of MCA within Scan + :type key: int + + :return: Single MCA + :rtype: 1D numpy array + """ + if not len(self): + raise IndexError("No MCA spectrum found in this scan") + + if isinstance(key, (int, long)): + mca_index = key + # allow negative index, like lists + if mca_index < 0: + mca_index = len(self) + mca_index + else: + raise TypeError("MCA index should be an integer (%s provided)" % + (type(key))) + + if not 0 <= mca_index < len(self): + msg = "MCA index must be in range 0-%d" % (len(self) - 1) + raise IndexError(msg) + + return self._scan._specfile.get_mca(self._scan.index, + mca_index) + + def __iter__(self): + """Return the next MCA data line each time this method is called. + + :return: Single MCA + :rtype: 1D numpy array + """ + for mca_index in range(len(self)): + yield self._scan._specfile.get_mca(self._scan.index, mca_index) + + +def _add_or_concatenate(dictionary, key, value): + """If key doesn't exist in dictionary, create a new ``key: value`` pair. + Else append/concatenate the new value to the existing one + """ + try: + if key not in dictionary: + dictionary[key] = value + else: + dictionary[key] += "\n" + value + except TypeError: + raise TypeError("Parameter value must be a string.") + + +class Scan(object): + """ + + :param specfile: Parent SpecFile from which this scan is extracted. + :type specfile: :class:`SpecFile` + :param scan_index: Unique index defining the scan in the SpecFile + :type scan_index: int + + Interface to access a SpecFile scan + + A scan is a block of descriptive header lines followed by a 2D data array. + + Following three ways of accessing a scan are equivalent:: + + sf = SpecFile("/path/to/specfile.dat") + + # Explicit class instantiation + scan2 = Scan(sf, scan_index=2) + + # 0-based index on a SpecFile object + scan2 = sf[2] + + # Using a "n.m" key (scan number starting with 1, scan order) + scan2 = sf["3.1"] + """ + def __init__(self, specfile, scan_index): + self._specfile = specfile + + self._index = scan_index + self._number = specfile.number(scan_index) + self._order = specfile.order(scan_index) + + self._scan_header_lines = self._specfile.scan_header(self._index) + self._file_header_lines = self._specfile.file_header(self._index) + + if self._file_header_lines == self._scan_header_lines: + self._file_header_lines = [] + self._header = self._file_header_lines + self._scan_header_lines + + self._scan_header_dict = {} + self._mca_header_dict = {} + for line in self._scan_header_lines: + match = re.search(r"#(\w+) *(.*)", line) + match_mca = re.search(r"#@(\w+) *(.*)", line) + if match: + hkey = match.group(1).lstrip("#").strip() + hvalue = match.group(2).strip() + _add_or_concatenate(self._scan_header_dict, hkey, hvalue) + elif match_mca: + hkey = match_mca.group(1).lstrip("#").strip() + hvalue = match_mca.group(2).strip() + _add_or_concatenate(self._mca_header_dict, hkey, hvalue) + else: + # this shouldn't happen + _logger.warning("Unable to parse scan header line " + line) + + self._labels = [] + if self.record_exists_in_hdr('L'): + try: + self._labels = self._specfile.labels(self._index) + except SfErrLineNotFound: + # SpecFile.labels raises an IndexError when encountering + # a Scan with no data, even if the header exists. + L_header = re.sub(r" {2,}", " ", # max. 2 spaces + self._scan_header_dict["L"]) + self._labels = L_header.split(" ") + + self._file_header_dict = {} + for line in self._file_header_lines: + match = re.search(r"#(\w+) *(.*)", line) + if match: + # header type + hkey = match.group(1).lstrip("#").strip() + hvalue = match.group(2).strip() + _add_or_concatenate(self._file_header_dict, hkey, hvalue) + else: + _logger.warning("Unable to parse file header line " + line) + + self._motor_names = self._specfile.motor_names(self._index) + self._motor_positions = self._specfile.motor_positions(self._index) + + self._data = None + self._mca = None + + @cython.embedsignature(False) + @property + def index(self): + """Unique scan index 0 - len(specfile)-1 + + This attribute is implemented as a read-only property as changing + its value may cause nasty side-effects (such as loading data from a + different scan without updating the header accordingly.""" + return self._index + + @cython.embedsignature(False) + @property + def number(self): + """First value on #S line (as int)""" + return self._number + + @cython.embedsignature(False) + @property + def order(self): + """Order can be > 1 if the same number is repeated in a specfile""" + return self._order + + @cython.embedsignature(False) + @property + def header(self): + """List of raw header lines (as a list of strings). + + This includes the file header, the scan header and possibly a MCA + header. + """ + return self._header + + @cython.embedsignature(False) + @property + def scan_header(self): + """List of raw scan header lines (as a list of strings). + """ + return self._scan_header_lines + + @cython.embedsignature(False) + @property + def file_header(self): + """List of raw file header lines (as a list of strings). + """ + return self._file_header_lines + + @cython.embedsignature(False) + @property + def scan_header_dict(self): + """ + Dictionary of scan header strings, keys without the leading``#`` + (e.g. ``scan_header_dict["S"]``). + Note: this does not include MCA header lines starting with ``#@``. + """ + return self._scan_header_dict + + @cython.embedsignature(False) + @property + def mca_header_dict(self): + """ + Dictionary of MCA header strings, keys without the leading ``#@`` + (e.g. ``mca_header_dict["CALIB"]``). + """ + return self._mca_header_dict + + @cython.embedsignature(False) + @property + def file_header_dict(self): + """ + Dictionary of file header strings, keys without the leading ``#`` + (e.g. ``file_header_dict["F"]``). + """ + return self._file_header_dict + + @cython.embedsignature(False) + @property + def labels(self): + """ + List of data column headers from ``#L`` scan header + """ + return self._labels + + @cython.embedsignature(False) + @property + def data(self): + """Scan data as a 2D numpy.ndarray with the usual attributes + (e.g. data.shape). + + The first index is the detector, the second index is the sample index. + """ + if self._data is None: + self._data = numpy.transpose(self._specfile.data(self._index)) + + return self._data + + @cython.embedsignature(False) + @property + def mca(self): + """MCA data in this scan. + + Each multichannel analysis is a 1D numpy array. Metadata about + MCA data is to be found in :py:attr:`mca_header`. + + :rtype: :class:`MCA` + """ + if self._mca is None: + self._mca = MCA(self) + return self._mca + + @cython.embedsignature(False) + @property + def motor_names(self): + """List of motor names from the ``#O`` file header line. + """ + return self._motor_names + + @cython.embedsignature(False) + @property + def motor_positions(self): + """List of motor positions as floats from the ``#P`` scan header line. + """ + return self._motor_positions + + def record_exists_in_hdr(self, record): + """Check whether a scan header line exists. + + This should be used before attempting to retrieve header information + using a C function that may crash with a *segmentation fault* if the + header isn't defined in the SpecFile. + + :param record: single upper case letter corresponding to the + header you want to test (e.g. ``L`` for labels) + :type record: str + + :return: True or False + :rtype: boolean + """ + for line in self._header: + if line.startswith("#" + record): + return True + return False + + def data_line(self, line_index): + """Returns data for a given line of this scan. + + .. note:: + + A data line returned by this method, corresponds to a data line + in the original specfile (a series of data points, one per + detector). In the :attr:`data` array, this line index corresponds + to the index in the second dimension (~ column) of the array. + + :param line_index: Index of data line to retrieve (starting with 0) + :type line_index: int + + :return: Line data as a 1D array of doubles + :rtype: numpy.ndarray + """ + # attribute data corresponds to a transposed version of the original + # specfile data (where detectors correspond to columns) + return self.data[:, line_index] + + def data_column_by_name(self, label): + """Returns a data column + + :param label: Label of data column to retrieve, as defined on the + ``#L`` line of the scan header. + :type label: str + + :return: Line data as a 1D array of doubles + :rtype: numpy.ndarray + """ + try: + ret = self._specfile.data_column_by_name(self._index, label) + except SfErrLineNotFound: + # Could be a "#C Scan aborted after 0 points" + _logger.warning("Cannot get data column %s in scan %d.%d", + label, self.number, self.order) + ret = numpy.empty((0, ), numpy.double) + return ret + + def motor_position_by_name(self, name): + """Returns the position for a given motor + + :param name: Name of motor, as defined on the ``#O`` line of the + file header. + :type name: str + + :return: Motor position + :rtype: float + """ + return self._specfile.motor_position_by_name(self._index, name) + + +def _string_to_char_star(string_): + """Convert a string to ASCII encoded bytes when using python3""" + if sys.version.startswith("3") and not isinstance(string_, bytes): + return bytes(string_, "ascii") + return string_ + + +def is_specfile(filename): + """Test if a file is a SPEC file, by checking if one of the first two + lines starts with *#F* (SPEC file header) or *#S* (scan header). + + :param str filename: File path + :return: *True* if file is a SPEC file, *False* if it is not a SPEC file + :rtype: bool + """ + if not os.path.isfile(filename): + return False + # test for presence of #S or #F in first two lines + f = open(filename) + for i, line in enumerate(f): + if line.startswith("#S ") or line.startswith("#F "): + f.close() + return True + if i >= 10: + break + f.close() + return False + + +cdef class SpecFile(object): + """ + + :param filename: Path of the SpecFile to read + + This class wraps the main data and header access functions of the C + SpecFile library. + """ + + cdef: + specfile_wrapper.SpecFileHandle *handle + str filename + + def __cinit__(self, filename): + cdef int error = SF_ERR_NO_ERRORS + self.handle = NULL + + if is_specfile(filename): + filename = _string_to_char_star(filename) + self.handle = specfile_wrapper.SfOpen(filename, &error) + if error: + self._handle_error(error) + else: + # handle_error takes care of raising the correct error, + # this causes the destructor to be called + self._handle_error(SF_ERR_FILE_OPEN) + + def __init__(self, filename): + if not isinstance(filename, str): + # encode unicode to str in python 2 + if sys.version_info[0] < 3: + self.filename = filename.encode() + # decode bytes to str in python 3 + elif sys.version_info[0] >= 3: + self.filename = filename.decode() + else: + self.filename = filename + + def __dealloc__(self): + """Destructor: Calls SfClose(self.handle)""" + # handle is NULL if SfOpen failed + if self.handle: + if specfile_wrapper.SfClose(self.handle): + _logger.warning("Error while closing SpecFile") + + def __len__(self): + """Return the number of scans in the SpecFile + """ + return specfile_wrapper.SfScanNo(self.handle) + + def __iter__(self): + """Return the next :class:`Scan` in a SpecFile each time this method + is called. + + This usually happens when the python built-in function ``next()`` is + called with a :class:`SpecFile` instance as a parameter, or when a + :class:`SpecFile` instance is used as an iterator (e.g. in a ``for`` + loop). + """ + for scan_index in range(len(self)): + yield Scan(self, scan_index) + + def __getitem__(self, key): + """Return a :class:`Scan` object. + + This special method is called when a :class:`SpecFile` instance is + accessed as a dictionary (e.g. ``sf[key]``). + + :param key: 0-based scan index or ``"n.m"`` key, where ``n`` is the scan + number defined on the ``#S`` header line and ``m`` is the order + :type key: int or str + + :return: Scan defined by its 0-based index or its ``"n.m"`` key + :rtype: :class:`Scan` + """ + msg = "The scan identification key can be an integer representing " + msg += "the unique scan index or a string 'N.M' with N being the scan" + msg += " number and M the order (eg '2.3')." + + if isinstance(key, int): + scan_index = key + # allow negative index, like lists + if scan_index < 0: + scan_index = len(self) + scan_index + else: + try: + (number, order) = map(int, key.split(".")) + scan_index = self.index(number, order) + except (ValueError, SfErrScanNotFound, KeyError): + # int() can raise a value error + raise KeyError(msg + "\nValid keys: '" + + "', '".join(self.keys()) + "'") + except AttributeError: + # e.g. "AttrErr: 'float' object has no attribute 'split'" + raise TypeError(msg) + + if not 0 <= scan_index < len(self): + msg = "Scan index must be in range 0-%d" % (len(self) - 1) + raise IndexError(msg) + + return Scan(self, scan_index) + + def keys(self): + """Returns list of scan keys (eg ``['1.1', '2.1',...]``). + + :return: list of scan keys + :rtype: list of strings + """ + ret_list = [] + list_of_numbers = self._list() + count = {} + + for number in list_of_numbers: + if number not in count: + count[number] = 1 + else: + count[number] += 1 + ret_list.append(u'%d.%d' % (number, count[number])) + + return ret_list + + def __contains__(self, key): + """Return ``True`` if ``key`` is a valid scan key. + Valid keys can be a string such as ``"1.1"`` or a 0-based scan index. + """ + return key in (self.keys() + list(range(len(self)))) + + def _get_error_string(self, error_code): + """Returns the error message corresponding to the error code. + + :param code: Error code + :type code: int + :return: Human readable error message + :rtype: str + """ + return ( specfile_wrapper.SfError(error_code)).decode() + + def _handle_error(self, error_code): + """Inspect error code, raise adequate error type if necessary. + + :param code: Error code + :type code: int + """ + error_message = self._get_error_string(error_code) + if error_code in ERRORS: + raise ERRORS[error_code](error_message) + + def index(self, scan_number, scan_order=1): + """Returns scan index from scan number and order. + + :param scan_number: Scan number (possibly non-unique). + :type scan_number: int + :param scan_order: Scan order. + :type scan_order: int default 1 + + :return: Unique scan index + :rtype: int + + + Scan indices are increasing from ``0`` to ``len(self)-1`` in the + order in which they appear in the file. + Scan numbers are defined by users and are not necessarily unique. + The scan order for a given scan number increments each time the scan + number appears in a given file. + """ + idx = specfile_wrapper.SfIndex(self.handle, scan_number, scan_order) + if idx == -1: + self._handle_error(SF_ERR_SCAN_NOT_FOUND) + return idx - 1 + + def number(self, scan_index): + """Returns scan number from scan index. + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: User defined scan number. + :rtype: int + """ + idx = specfile_wrapper.SfNumber(self.handle, scan_index + 1) + if idx == -1: + self._handle_error(SF_ERR_SCAN_NOT_FOUND) + return idx + + def order(self, scan_index): + """Returns scan order from scan index. + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: Scan order (sequential number incrementing each time a + non-unique occurrence of a scan number is encountered). + :rtype: int + """ + ordr = specfile_wrapper.SfOrder(self.handle, scan_index + 1) + if ordr == -1: + self._handle_error(SF_ERR_SCAN_NOT_FOUND) + return ordr + + def _list(self): + """see documentation of :meth:`list` + """ + cdef: + long *scan_numbers + int error = SF_ERR_NO_ERRORS + + scan_numbers = specfile_wrapper.SfList(self.handle, &error) + self._handle_error(error) + + ret_list = [] + for i in range(len(self)): + ret_list.append(scan_numbers[i]) + + free(scan_numbers) + return ret_list + + def list(self): + """Returns list (1D numpy array) of scan numbers in SpecFile. + + :return: list of scan numbers (from `` #S`` lines) in the same order + as in the original SpecFile (e.g ``[1, 1, 2, 3, …]``). + :rtype: numpy array + """ + # this method is overloaded in specfilewrapper to output a string + # representation of the list + return self._list() + + def data(self, scan_index): + """Returns data for the specified scan index. + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: Complete scan data as a 2D array of doubles + :rtype: numpy.ndarray + """ + cdef: + double** mydata + long* data_info + int i, j + int error = SF_ERR_NO_ERRORS + long nlines, ncolumns, regular + + sfdata_error = specfile_wrapper.SfData(self.handle, + scan_index + 1, + &mydata, + &data_info, + &error) + self._handle_error(error) + + if data_info != 0: + nlines = data_info[0] + ncolumns = data_info[1] + regular = data_info[2] + else: + nlines = 0 + ncolumns = 0 + regular = 0 + + cdef numpy.ndarray ret_array = numpy.empty((nlines, ncolumns), + dtype=numpy.double) + for i in range(nlines): + for j in range(ncolumns): + ret_array[i, j] = mydata[i][j] + + specfile_wrapper.freeArrNZ(&mydata, nlines) + free(data_info) + return ret_array + + def data_column_by_name(self, scan_index, label): + """Returns data column for the specified scan index and column label. + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + :param label: Label of data column, as defined in the ``#L`` line + of the scan header. + :type label: str + + :return: Data column as a 1D array of doubles + :rtype: numpy.ndarray + """ + cdef: + double* data_column + long i, nlines + int error = SF_ERR_NO_ERRORS + + label = _string_to_char_star(label) + + nlines = specfile_wrapper.SfDataColByName(self.handle, + scan_index + 1, + label, + &data_column, + &error) + self._handle_error(error) + + cdef numpy.ndarray ret_array = numpy.empty((nlines,), + dtype=numpy.double) + for i in range(nlines): + ret_array[i] = data_column[i] + + free(data_column) + return ret_array + + def scan_header(self, scan_index): + """Return list of scan header lines. + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: List of raw scan header lines + :rtype: list of str + """ + cdef: + char** lines + int error = SF_ERR_NO_ERRORS + + nlines = specfile_wrapper.SfHeader(self.handle, + scan_index + 1, + "", # no pattern matching + &lines, + &error) + + self._handle_error(error) + + lines_list = [] + for i in range(nlines): + line = lines[i].decode() + lines_list.append(line) + + specfile_wrapper.freeArrNZ(&lines, nlines) + return lines_list + + def file_header(self, scan_index=0): + """Return list of file header lines. + + A file header contains all lines between a ``#F`` header line and + a ``#S`` header line (start of scan). We need to specify a scan + number because there can be more than one file header in a given file. + A file header applies to all subsequent scans, until a new file + header is defined. + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: List of raw file header lines + :rtype: list of str + """ + cdef: + char** lines + int error = SF_ERR_NO_ERRORS + + nlines = specfile_wrapper.SfFileHeader(self.handle, + scan_index + 1, + "", # no pattern matching + &lines, + &error) + self._handle_error(error) + + lines_list = [] + for i in range(nlines): + line = lines[i].decode() + lines_list.append(line) + + specfile_wrapper.freeArrNZ(&lines, nlines) + return lines_list + + def columns(self, scan_index): + """Return number of columns in a scan from the ``#N`` header line + (without ``#N`` and scan number) + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: Number of columns in scan from ``#N`` line + :rtype: int + """ + cdef: + int error = SF_ERR_NO_ERRORS + + ncolumns = specfile_wrapper.SfNoColumns(self.handle, + scan_index + 1, + &error) + self._handle_error(error) + + return ncolumns + + def command(self, scan_index): + """Return ``#S`` line (without ``#S`` and scan number) + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: S line + :rtype: str + """ + cdef: + int error = SF_ERR_NO_ERRORS + + s_record = specfile_wrapper.SfCommand(self.handle, + scan_index + 1, + &error) + self._handle_error(error) + + return s_record.decode() + + def date(self, scan_index=0): + """Return date from ``#D`` line + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: Date from ``#D`` line + :rtype: str + """ + cdef: + int error = SF_ERR_NO_ERRORS + + d_line = specfile_wrapper.SfDate(self.handle, + scan_index + 1, + &error) + self._handle_error(error) + + return d_line.decode() + + def labels(self, scan_index): + """Return all labels from ``#L`` line + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: All labels from ``#L`` line + :rtype: list of strings + """ + cdef: + char** all_labels + int error = SF_ERR_NO_ERRORS + + nlabels = specfile_wrapper.SfAllLabels(self.handle, + scan_index + 1, + &all_labels, + &error) + self._handle_error(error) + + labels_list = [] + for i in range(nlabels): + labels_list.append(all_labels[i].decode()) + + specfile_wrapper.freeArrNZ(&all_labels, nlabels) + return labels_list + + def motor_names(self, scan_index=0): + """Return all motor names from ``#O`` lines + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``.If not specified, defaults to 0 (meaning the + function returns motors names associated with the first scan). + This parameter makes a difference only if there are more than + on file header in the file, in which case the file header applies + to all following scans until a new file header appears. + :type scan_index: int + + :return: All motor names + :rtype: list of strings + """ + cdef: + char** all_motors + int error = SF_ERR_NO_ERRORS + + nmotors = specfile_wrapper.SfAllMotors(self.handle, + scan_index + 1, + &all_motors, + &error) + self._handle_error(error) + + motors_list = [] + for i in range(nmotors): + motors_list.append(all_motors[i].decode()) + + specfile_wrapper.freeArrNZ(&all_motors, nmotors) + return motors_list + + def motor_positions(self, scan_index): + """Return all motor positions + + :param scan_index: Unique scan index between ``0`` + and ``len(self)-1``. + :type scan_index: int + + :return: All motor positions + :rtype: list of double + """ + cdef: + double* motor_positions + int error = SF_ERR_NO_ERRORS + + nmotors = specfile_wrapper.SfAllMotorPos(self.handle, + scan_index + 1, + &motor_positions, + &error) + self._handle_error(error) + + motor_positions_list = [] + for i in range(nmotors): + motor_positions_list.append(motor_positions[i]) + + free(motor_positions) + return motor_positions_list + + def motor_position_by_name(self, scan_index, name): + """Return motor position + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: Specified motor position + :rtype: double + """ + cdef: + int error = SF_ERR_NO_ERRORS + + name = _string_to_char_star(name) + + motor_position = specfile_wrapper.SfMotorPosByName(self.handle, + scan_index + 1, + name, + &error) + self._handle_error(error) + + return motor_position + + def number_of_mca(self, scan_index): + """Return number of mca spectra in a scan. + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: Number of mca spectra. + :rtype: int + """ + cdef: + int error = SF_ERR_NO_ERRORS + + num_mca = specfile_wrapper.SfNoMca(self.handle, + scan_index + 1, + &error) + # error code updating isn't implemented in SfNoMCA + if num_mca == -1: + raise SfNoMcaError("Failed to retrieve number of MCA " + + "(SfNoMca returned -1)") + return num_mca + + def mca_calibration(self, scan_index): + """Return MCA calibration in the form :math:`a + b x + c x²` + + Raise a KeyError if there is no ``@CALIB`` line in the scan header. + + :param scan_index: Unique scan index between ``0`` and + ``len(self)-1``. + :type scan_index: int + + :return: MCA calibration as a list of 3 values :math:`(a, b, c)` + :rtype: list of floats + """ + cdef: + int error = SF_ERR_NO_ERRORS + double* mca_calib + + mca_calib_error = specfile_wrapper.SfMcaCalib(self.handle, + scan_index + 1, + &mca_calib, + &error) + + # error code updating isn't implemented in SfMcaCalib + if mca_calib_error: + raise KeyError("MCA calibration line (@CALIB) not found") + + mca_calib_list = [] + for i in range(3): + mca_calib_list.append(mca_calib[i]) + + free(mca_calib) + return mca_calib_list + + def get_mca(self, scan_index, mca_index): + """Return one MCA spectrum + + :param scan_index: Unique scan index between ``0`` and ``len(self)-1``. + :type scan_index: int + :param mca_index: Index of MCA in the scan + :type mca_index: int + + :return: MCA spectrum + :rtype: 1D numpy array + """ + cdef: + int error = SF_ERR_NO_ERRORS + double* mca_data + long len_mca + + len_mca = specfile_wrapper.SfGetMca(self.handle, + scan_index + 1, + mca_index + 1, + &mca_data, + &error) + self._handle_error(error) + + cdef numpy.ndarray ret_array = numpy.empty((len_mca,), + dtype=numpy.double) + for i in range(len_mca): + ret_array[i] = mca_data[i] + + free(mca_data) + return ret_array -- cgit v1.2.3