# -*- encoding: utf-8 -*- ''' This file is part of the binoculars project. The BINoculars library is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. The BINoculars library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with the hkl library. If not, see . Copyright (C) 2015-2017 Synchrotron SOLEIL L'Orme des Merisiers Saint-Aubin BP 48 91192 GIF-sur-YVETTE CEDEX Copyright (C) 2012-2015 European Synchrotron Radiation Facility Grenoble, France Authors: Willem Onderwaater Picca Frédéric-Emmanuel ''' import numpy import math import os import tables import sys from collections import namedtuple from math import cos, sin from numpy.linalg import inv from pyFAI.detectors import ALL_DETECTORS from gi.repository import Hkl from .. import backend, errors, util ############### # Projections # ############### PDataFrame = namedtuple("PDataFrame", ["pixels", "k", "ub", "R", "P"]) class realspace(backend.ProjectionBase): # scalars: mu, theta, [chi, phi, "omitted"] delta, gamR, gamT, ty, # wavelength 3x3 matrix: UB def project(self, index, pdataframe): return (pdataframe.pixels[1], pdataframe.pixels[2]) def get_axis_labels(self): return 'x', 'y' class Pixels(backend.ProjectionBase): # scalars: mu, theta, [chi, phi, "omitted"] delta, gamR, gamT, ty, # wavelength 3x3 matrix: UB def project(self, index, pdataframe): return numpy.meshgrid(numpy.arange(pdataframe.pixels[0].shape[1]), numpy.arange(pdataframe.pixels[0].shape[0])) def get_axis_labels(self): return 'x', 'y' class HKLProjection(backend.ProjectionBase): # scalars: mu, theta, [chi, phi, "omitted"] delta, gamR, gamT, ty, # wavelength 3x3 matrix: UB def project(self, index, pdataframe): # put the detector at the right position pixels, k, UB, R, P = pdataframe ki = [1, 0, 0] RUB_1 = inv(numpy.dot(R, UB)) RUB_1P = numpy.dot(RUB_1, P) kf = normalized(pixels, axis=0) hkl_f = numpy.tensordot(RUB_1P, kf, axes=1) hkl_i = numpy.dot(RUB_1, ki) hkl = hkl_f - hkl_i[:, numpy.newaxis, numpy.newaxis] h, k, l = hkl * k return (h, k, l) def get_axis_labels(self): return 'H', 'K', 'L' class HKProjection(HKLProjection): def project(self, index, pdataframe): h, k, l = super(HKProjection, self).project(index, pdataframe) return h, k def get_axis_labels(self): return 'H', 'K' class QxQyQzProjection(backend.ProjectionBase): def project(self, index, pdataframe): # put the detector at the right position pixels, k, _, R, P = pdataframe # TODO factorize with HklProjection. Here a trick in order to # compute Qx Qy Qz in the omega basis. UB = numpy.array([[2* math.pi, 0 , 0], [0 , 0 , 2* math.pi], [0 , -2 * math.pi, 0]]) UB = numpy.array([[2* math.pi, 0 , 0], [0 , 2 * math.pi , 0], [0 , 0, 2 * math.pi]]) # the ki vector should be in the NexusFile or easily extracted # from the hkl library. ki = [1, 0, 0] RUB_1 = inv(numpy.dot(R, UB)) RUB_1P = numpy.dot(RUB_1, P) kf = normalized(pixels, axis=0) hkl_f = numpy.tensordot(RUB_1P, kf, axes=1) hkl_i = numpy.dot(RUB_1, ki) hkl = hkl_f - hkl_i[:, numpy.newaxis, numpy.newaxis] qx, qy, qz = hkl * k return qx, qy, qz def get_axis_labels(self): return "Qx", "Qy", "Qz" class QparQperProjection(QxQyQzProjection): def project(self, index, pdataframe): qx, qy, qz = super(QparQperProjection, self).project(index, pdataframe) return numpy.sqrt(qx*qx + qy*qy), qz def get_axis_labels(self): return 'Qpar', 'Qper' ################### # Common methodes # ################### WRONG_ATTENUATION = -100 def get_nxclass(hfile, nxclass, path="/"): """ :param hfile: the hdf5 file. :type hfile: tables.file. :param nxclass: the nxclass to extract :type nxclass: str """ for node in hfile.walk_nodes(path): try: if nxclass == node._v_attrs['NX_class']: return node except KeyError: pass return None def as_string(node): if node.shape == (): return str(node.read()) else: return node[0][:-1] Diffractometer = namedtuple('Diffractometer', ['name', # name of the hkl diffractometer 'ub', # the UB matrix 'geometry']) # the HklGeometry def get_diffractometer(hfile): """ Construct a Diffractometer from a NeXus file """ node = get_nxclass(hfile, 'NXdiffractometer') name = as_string(node.type) ub = node.UB[:] factory = Hkl.factories()[name] geometry = factory.create_new_geometry() # wavelength = get_nxclass(hfile, 'NXmonochromator').wavelength[0] # geometry.wavelength_set(wavelength) return Diffractometer(name, ub, geometry) Sample = namedtuple("Sample", ["a", "b", "c", "alpha", "beta", "gamma", "ux", "uy", "uz", "sample"]) def get_sample(hfile): # hkl sample a = b = c = 1.54 alpha = beta = gamma = 90 ux = uy = uz = 0 sample = Hkl.Sample.new("test") lattice = Hkl.Lattice.new(a, b, c, math.radians(alpha), math.radians(beta), math.radians(gamma)) sample.lattice_set(lattice) parameter = sample.ux_get() parameter.value_set(ux, Hkl.UnitEnum.USER) sample.ux_set(parameter) parameter = sample.uy_get() parameter.value_set(uy, Hkl.UnitEnum.USER) sample.uy_set(parameter) parameter = sample.uz_get() parameter.value_set(uz, Hkl.UnitEnum.USER) sample.uz_set(parameter) return Sample(1.54, 1.54, 1.54, 90, 90, 90, 0, 0, 0, sample) Detector = namedtuple("Detector", ["name", "detector"]) def get_detector(hfile): detector = Hkl.Detector.factory_new(Hkl.DetectorType(0)) return Detector("imxpads140", detector) Source = namedtuple("Source", ["wavelength"]) def get_source(hfile): wavelength = get_nxclass(hfile, 'NXmonochromator').wavelength[0] return Source(wavelength) DataFrame = namedtuple("DataFrame", ["diffractometer", "sample", "detector", "source", "h5_nodes"]) def dataframes(hfile, data_path=None): diffractometer = get_diffractometer(hfile) sample = get_sample(hfile) detector = get_detector(hfile) source = get_source(hfile) for group in hfile.get_node('/'): scan_data = group._f_get_child("scan_data") # now instantiate the pytables objects h5_nodes = {} for key, hitem in data_path.items(): try: child = scan_data._f_get_child(hitem.name) except tables.exceptions.NoSuchNodeError: if hitem.optional: child = None else: raise h5_nodes[key] = child yield DataFrame(diffractometer, sample, detector, source, h5_nodes) def get_ki(wavelength): """ for now the direction is always along x """ TAU = 2 * math.pi return numpy.array([TAU / wavelength, 0, 0]) def normalized(a, axis=-1, order=2): l2 = numpy.atleast_1d(numpy.linalg.norm(a, order, axis)) l2[l2 == 0] = 1 return a / numpy.expand_dims(l2, axis) def hkl_matrix_to_numpy(m): M = numpy.empty((3, 3)) for i in range(3): for j in range(3): M[i, j] = m.get(i, j) return M def M(theta, u): """ :param theta: the axis value in radian :type theta: float :param u: the axis vector [x, y, z] :type u: [float, float, float] :return: the rotation matrix :rtype: numpy.ndarray (3, 3) """ c = cos(theta) one_minus_c = 1 - c s = sin(theta) return numpy.array([[c + u[0]**2 * one_minus_c, u[0] * u[1] * one_minus_c - u[2] * s, u[0] * u[2] * one_minus_c + u[1] * s], [u[0] * u[1] * one_minus_c + u[2] * s, c + u[1]**2 * one_minus_c, u[1] * u[2] * one_minus_c - u[0] * s], [u[0] * u[2] * one_minus_c - u[1] * s, u[1] * u[2] * one_minus_c + u[0] * s, c + u[2]**2 * one_minus_c]]) ################## # Input Backends # ################## class SIXS(backend.InputBase): # OFFICIAL API dbg_scanno = None dbg_pointno = None def generate_jobs(self, command): scans = util.parse_multi_range(','.join(command).replace(' ', ',')) if not len(scans): sys.stderr.write('error: no scans selected, nothing to do\n') for scanno in scans: util.status('processing scan {0}...'.format(scanno)) if self.config.pr: pointcount = self.config.pr[1] - self.config.pr[0] + 1 start = self.config.pr[0] else: start = 0 pointcount = self.get_pointcount(scanno) if pointcount > self.config.target_weight * 1.4: for s in util.chunk_slicer(pointcount, self.config.target_weight): yield backend.Job(scan=scanno, firstpoint=start+s.start, lastpoint=start+s.stop-1, weight=s.stop-s.start) else: yield backend.Job(scan=scanno, firstpoint=start, lastpoint=start+pointcount-1, weight=pointcount) def process_job(self, job): super(SIXS, self).process_job(job) with tables.open_file(self.get_filename(job.scan), 'r') as scan: self.metadict = dict() try: for dataframe in dataframes(scan, self.HPATH): pixels = self.get_pixels(dataframe.detector) detector = ALL_DETECTORS[dataframe.detector.name]() mask = detector.mask.astype(numpy.bool) maskmatrix = load_matrix(self.config.maskmatrix) if maskmatrix is not None: mask = numpy.bitwise_or(mask, maskmatrix) for index in range(job.firstpoint, job.lastpoint + 1): yield self.process_image(index, dataframe, pixels, mask) util.statuseol() except Exception as exc: exc.args = errors.addmessage(exc.args, ', An error occured for scan {0} at point {1}. See above for more information'.format(self.dbg_scanno, self.dbg_pointno)) raise self.metadata.add_section('sixs_backend', self.metadict) def parse_config(self, config): super(SIXS, self).parse_config(config) self.config.xmask = util.parse_multi_range(config.pop('xmask', None)) # Optional, select a subset of the image range in the x direction. all by default self.config.ymask = util.parse_multi_range(config.pop('ymask', None)) # Optional, select a subset of the image range in the y direction. all by default self.config.nexusdir = config.pop('nexusdir', None) # location of the nexus files (take precedence on nexusfile) self.config.nexusfile = config.pop('nexusfile', None) # Location of the specfile self.config.pr = config.pop('pr', None) # Optional, all range by default if self.config.xmask is None: self.config.xmask = slice(None) if self.config.ymask is None: self.config.ymask = slice(None) if self.config.pr: self.config.pr = util.parse_tuple(self.config.pr, length=2, type=int) self.config.sdd = float(config.pop('sdd')) # sample to detector distance (mm) self.config.centralpixel = util.parse_tuple(config.pop('centralpixel'), length=2, type=int) # x,y self.config.maskmatrix = config.pop('maskmatrix', None) # Optional, if supplied pixels where the mask is 0 will be removed self.config.detrot = config.pop('detrot', None) # detector rotation around x (1, 0, 0) if self.config.detrot is not None: try: self.config.detrot = float(self.config.detrot) except ValueError: self.config.detrot = None # attenuation_coefficient (Optional) attenuation_coefficient = config.pop('attenuation_coefficient', None) if attenuation_coefficient is not None: try: self.config.attenuation_coefficient = float(attenuation_coefficient) except ValueError: self.config.attenuation_coefficient = None else: self.config.attenuation_coefficient = None def get_destination_options(self, command): if not command: return False command = ','.join(command).replace(' ', ',') scans = util.parse_multi_range(command) return dict(first=min(scans), last=max(scans), range=','.join(str(scan) for scan in scans)) # CONVENIENCE FUNCTIONS def get_filename(self, scanno): filename = None if self.config.nexusdir: dirname = self.config.nexusdir files = [f for f in os.listdir(dirname) if str(scanno).zfill(5) in f] if files is not []: filename = os.path.join(dirname, files[0]) else: filename = self.config.nexusfile.format(scanno=str(scanno).zfill(5)) if not os.path.exists(filename): raise errors.ConfigError('nexus filename does not exist: {0}'.format(filename)) return filename @staticmethod def apply_mask(data, xmask, ymask): roi = data[ymask, :] return roi[:, xmask] HItem = namedtuple("HItem", ["name", "optional"]) class FlyScanUHV(SIXS): HPATH = { "image": HItem("xpad_image", False), "mu": HItem("UHV_MU", False), "omega": HItem("UHV_OMEGA", False), "delta": HItem("UHV_DELTA", False), "gamma": HItem("UHV_GAMMA", False), "attenuation": HItem("attenuation", True), } def get_pointcount(self, scanno): # just open the file in order to extract the number of step with tables.open_file(self.get_filename(scanno), 'r') as scan: return get_nxclass(scan, "NXdata").xpad_image.shape[0] def get_attenuation(self, index, h5_nodes, offset): attenuation = None if self.config.attenuation_coefficient is not None: try: node = h5_nodes['attenuation'] if node is not None: attenuation = node[index + offset] else: raise Exception("you asked for attenuation but the file does not contain attenuation informations.") except IndexError: attenuation = WRONG_ATTENUATION return attenuation def get_values(self, index, h5_nodes): image = h5_nodes['image'][index] mu = h5_nodes['mu'][index] omega = h5_nodes['omega'][index] delta = h5_nodes['delta'][index] gamma = h5_nodes['gamma'][index] attenuation = self.get_attenuation(index, h5_nodes, 2) return (image, attenuation, (mu, omega, delta, gamma)) def process_image(self, index, dataframe, pixels, mask): util.status(str(index)) # extract the data from the h5 nodes h5_nodes = dataframe.h5_nodes intensity, attenuation, values = self.get_values(index, h5_nodes) # BEWARE in order to avoid precision problem we convert the # uint16 -> float32. (the size of the mantis is on 23 bits) # enought to contain the uint16. If one day we use uint32, it # should be necessary to convert into float64. intensity = intensity.astype('float32') weights = None if self.config.attenuation_coefficient is not None: if attenuation != WRONG_ATTENUATION: intensity *= self.config.attenuation_coefficient ** attenuation weights = numpy.ones_like(intensity) weights *= ~mask else: weights = numpy.zeros_like(intensity) else: weights = numpy.ones_like(intensity) weights *= ~mask k = 2 * math.pi / dataframe.source.wavelength hkl_geometry = dataframe.diffractometer.geometry hkl_geometry.axis_values_set(values, Hkl.UnitEnum.USER) # sample hkl_sample = dataframe.sample.sample q_sample = hkl_geometry.sample_rotation_get(hkl_sample) R = hkl_matrix_to_numpy(q_sample.to_matrix()) # detector hkl_detector = dataframe.detector.detector q_detector = hkl_geometry.detector_rotation_get(hkl_detector) P = hkl_matrix_to_numpy(q_detector.to_matrix()) if self.config.detrot is not None: P = numpy.dot(P, M(math.radians(self.config.detrot), [1, 0, 0])) pdataframe = PDataFrame(pixels, k, dataframe.diffractometer.ub, R, P) # util.status('{4}| gamma: {0}, delta: {1}, theta: {2}, mu: {3}'.format(gamma, delta, theta, mu, time.ctime(time.time()))) return intensity, weights, (index, pdataframe) def get_pixels(self, detector): detector = ALL_DETECTORS[detector.name]() y, x, _ = detector.calc_cartesian_positions() y0 = y[self.config.centralpixel[1], self.config.centralpixel[0]] x0 = x[self.config.centralpixel[1], self.config.centralpixel[0]] z = numpy.ones(x.shape) * -1 * self.config.sdd # return converted to the hkl library coordinates # x -> -y # y -> z # z -> -x return numpy.array([-z, -(x - x0), (y - y0)]) class FlyScanUHV2(FlyScanUHV): HPATH = { "image": HItem("xpad_image", False), "mu": HItem("mu", False), "omega": HItem("omega", False), "delta": HItem("delta", False), "gamma": HItem("gamma", False), "attenuation": HItem("attenuation", True), } class FlyMedH(FlyScanUHV): HPATH = { "image": HItem("xpad_image", False), "pitch": HItem("beta", True), "mu": HItem("mu", False), "gamma": HItem("gamma", False), "delta": HItem("delta", False), "attenuation": HItem("attenuation", True), } def get_values(self, index, h5_nodes): image = h5_nodes['image'][index] pitch = h5_nodes['pitch'][index] if h5_nodes['pitch'] else 0.3 mu = h5_nodes['mu'][index] gamma = h5_nodes['gamma'][index] delta = h5_nodes['delta'][index] attenuation = self.get_attenuation(index, h5_nodes, 2) return (image, attenuation, (pitch, mu, gamma, delta)) class SBSMedH(FlyScanUHV): HPATH = { "image": HItem("data_03", False), "pitch": HItem("data_22", False), "mu": HItem("data_18", False), "gamma": HItem("data_20", False), "delta": HItem("data_19", False), "attenuation": HItem("data_xx", True), } def get_pointcount(self, scanno): # just open the file in order to extract the number of step with tables.open_file(self.get_filename(scanno), 'r') as scan: return get_nxclass(scan, "NXdata").data_03.shape[0] def get_values(self, index, h5_nodes): image = h5_nodes['image'][index] pitch = h5_nodes['pitch'][index] mu = h5_nodes['mu'][index] gamma = h5_nodes['gamma'][index] delta = h5_nodes['delta'][index] attenuation = self.get_attenuation(index, h5_nodes, 2) return (image, attenuation, (pitch, mu, gamma, delta)) class FlyMedV(FlyScanUHV): HPATH = { "image": HItem("xpad_image", False), "beta": HItem("beta", True), "mu": HItem("mu", False), "omega": HItem("omega", False), "gamma": HItem("gamma", False), "delta": HItem("delta", False), "etaa": HItem("etaa", True), "attenuation": HItem("attenuation", True), } def get_values(self, index, h5_nodes): image = h5_nodes['image'][index] beta = h5_nodes['beta'][index] if h5_nodes['beta'] else 0.0 mu = h5_nodes['mu'][index] omega = h5_nodes['omega'][index] gamma = h5_nodes['gamma'][index] delta = h5_nodes['delta'][index] etaa = h5_nodes['etaa'][index] if h5_nodes['etaa'] else 0.0 attenuation = self.get_attenuation(index, h5_nodes, 2) return (image, attenuation, (beta, mu, omega, gamma, delta, etaa)) def load_matrix(filename): if filename is None: return None if os.path.exists(filename): ext = os.path.splitext(filename)[-1] if ext == '.txt': return numpy.array(numpy.loadtxt(filename), dtype=numpy.bool) elif ext == '.npy': mask = numpy.array(numpy.load(filename), dtype=numpy.bool) print("loaded mask sum: ", numpy.sum(mask)) return mask else: raise ValueError('unknown extension {0}, unable to load matrix!\n'.format(ext)) else: raise IOError('filename: {0} does not exist. Can not load matrix'.format(filename))