diff options
Diffstat (limited to 'src/sardana/pool/poolmotorgroup.py')
-rw-r--r-- | src/sardana/pool/poolmotorgroup.py | 271 |
1 files changed, 0 insertions, 271 deletions
diff --git a/src/sardana/pool/poolmotorgroup.py b/src/sardana/pool/poolmotorgroup.py deleted file mode 100644 index d845d986..00000000 --- a/src/sardana/pool/poolmotorgroup.py +++ /dev/null @@ -1,271 +0,0 @@ -#!/usr/bin/env python - -############################################################################## -## -## This file is part of Sardana -## -## http://www.sardana-controls.org/ -## -## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain -## -## Sardana is free software: you can redistribute it and/or modify -## it under the terms of the GNU Lesser General Public License as published by -## the Free Software Foundation, either version 3 of the License, or -## (at your option) any later version. -## -## Sardana is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU Lesser General Public License for more details. -## -## You should have received a copy of the GNU Lesser General Public License -## along with Sardana. If not, see <http://www.gnu.org/licenses/>. -## -############################################################################## - -"""This module is part of the Python Pool library. It defines the base classes -for""" - -__all__ = ["PoolMotorGroup"] - -__docformat__ = 'restructuredtext' - -import time -import collections - -from sardana import ElementType -from sardana.sardanaattribute import SardanaAttribute -from sardana.pool.poolgroupelement import PoolGroupElement -from sardana.pool.poolmotion import PoolMotion - - -class Position(SardanaAttribute): - - def __init__(self, *args, **kwargs): - self._w_value_map = None - super(Position, self).__init__(*args, **kwargs) - for pos_attr in self.obj.get_physical_position_attribute_iterator(): - pos_attr.add_listener(self.on_change) - - def _has_value(self): - for pos_attr in self.obj.get_physical_position_attribute_iterator(): - if not pos_attr.has_value(): - return False - return True - - def _in_error(self): - for pos_attr in self.obj.get_physical_position_attribute_iterator(): - if pos_attr.in_error(): - return True - return False - - def get_elements(self): - return self.obj.get_user_elements() - - def get_element_nb(self): - return len(self.get_user_elements()) - - def _get_exc_info(self): - for position_attr in self.obj.get_physical_position_attribute_iterator(): - if position_attr.error: - return position_attr.get_exc_info() - - def _get_timestamp(self): - return max([ pos_attr.timestamp for pos_attr in self.obj.get_physical_position_attribute_iterator() ]) - - def _set_value(self, value, exc_info=None, timestamp=None, propagate=1): - raise Exception("Cannot set position value for motor group %s" % self.obj.name) - - def _get_value(self): - return [ position.value for position in self.obj.get_physical_position_attribute_iterator() ] - - def _get_write_value(self): - return [ position.w_value for position in self.obj.get_physical_position_attribute_iterator() ] - - def _set_write_value(self, w_value, timestamp=None, propagate=1): - assert len(w_value) == self.get_element_nb() - if isinstance(w_value, collections.Sequence): - w_value_map = {} - for v, elem in zip(w_value, self.get_elements()): - w_value_map[elem] = v - else: - w_value_map = w_value - w_value = [] - for elem in self.get_elements(): - w_value.append(w_value_map[elem]) - self._w_value_map = w_value - super(Position, self).set_write_value(w_value, timestamp=timestamp, - propagate=propagate) - - def on_change(self, evt_src, evt_type, evt_value): - self.fire_read_event(propagate=evt_type.priority) - - def update(self, cache=True, propagate=1): - if cache: - for phy_elem_pos in self.obj.get_low_level_physical_position_attribute_iterator(): - if not phy_elem_pos.has_value(): - cache = False - break - if not cache: - dial_position_values = self.obj.motion.read_dial_position(serial=True) - for motion_obj, position_value in dial_position_values.items(): - motion_obj.put_dial_position(position_value, propagate=propagate) - - -class PoolMotorGroup(PoolGroupElement): - - def __init__(self, **kwargs): - self._physical_elements = [] - self._in_start_move = False - kwargs['elem_type'] = ElementType.MotorGroup - PoolGroupElement.__init__(self, **kwargs) - on_change = self.on_change - self._position = Position(self, listeners=on_change) - - def _create_action_cache(self): - motion_name = "%s.Motion" % self._name - return PoolMotion(self, motion_name) - - # -------------------------------------------------------------------------- - # Event forwarding - # -------------------------------------------------------------------------- - - def on_change(self, evt_src, evt_type, evt_value): - # forward all events coming from attributes to the listeners - self.fire_event(evt_type, evt_value) - - def on_element_changed(self, evt_src, evt_type, evt_value): - name = evt_type.name.lower() - if name in ('state', 'position'): - state, status = self._calculate_states() - if name == 'state': - propagate_state = evt_type.priority - else: - propagate_state = 0 - self.set_state(state, propagate=propagate_state) - self.set_status(status, propagate=propagate_state) - - def add_user_element(self, element, index=None): - elem_type = element.get_type() - if elem_type == ElementType.Motor: - pass - elif elem_type == ElementType.PseudoMotor: - #TODO: make this happen - pass - else: - raise Exception("element %s is not a motor" % element.name) - - PoolGroupElement.add_user_element(self, element, index=index) - - # -------------------------------------------------------------------------- - # position - # -------------------------------------------------------------------------- - - def get_position_attribute(self): - return self._position - - def get_low_level_physical_position_attribute_iterator(self): - return self.get_physical_elements_attribute_iterator() - - def get_physical_position_attribute_iterator(self): - return self.get_user_elements_attribute_iterator() - - def get_physical_positions_attribute_sequence(self): - return self.get_user_elements_attribute_sequence() - - def get_physical_positions_attribute_map(self): - return self.get_user_elements_attribute_map() - - def get_position(self, cache=True, propagate=1): - """Returns the user position. - - :param cache: - if ``True`` (default) return value in cache, otherwise read value - from hardware - :type cache: - bool - :param propagate: - 0 for not propagating, 1 to propagate, 2 propagate with priority - :type propagate: - int - :return: - the user position - :rtype: - :class:`~sardana.sardanaattribute.SardanaAttribute`""" - position = self._position - position.update(cache=cache, propagate=propagate) - return position - - def set_position(self, positions): - """Moves the motor group to the specified user positions - - :param positions: - the user positions to move to - :type positions: - sequence< :class:`~numbers.Number` >""" - self.start_move(positions) - - def set_write_position(self, w_position, timestamp=None, propagate=1): - """Sets a new write value for the user position. - - :param w_position: - the new write value for user position - :type w_position: - sequence< :class:`~numbers.Number` > - :param propagate: - 0 for not propagating, 1 to propagate, 2 propagate with priority - :type propagate: int""" - self._position.set_write_value(w_position, timestamp=timestamp, - propagate=propagate) - - position = property(get_position, set_position, doc="motor group positions") - - # -------------------------------------------------------------------------- - # default acquisition channel - # -------------------------------------------------------------------------- - - def get_default_attribute(self): - return self.get_position_attribute() - - # -------------------------------------------------------------------------- - # motion - # -------------------------------------------------------------------------- - - def get_motion(self): - return self.get_action_cache() - - motion = property(get_motion, doc="motion object") - - # -------------------------------------------------------------------------- - # motion calculation - # -------------------------------------------------------------------------- - - def calculate_motion(self, new_positions, items=None): - user_elements = self.get_user_elements() - if items is None: - items = {} - calculated = {} - for new_position, element in zip(new_positions, user_elements): - calculated[element] = new_position - - for new_position, element in zip(new_positions, user_elements): - element.calculate_motion(new_position, items=items, - calculated=calculated) - return items - - def start_move(self, new_position): - self._in_start_move = True - try: - return self._start_move(new_position) - finally: - self._in_start_move = False - - def _start_move(self, new_positions): - self._aborted = False - items = self.calculate_motion(new_positions) - timestamp = time.time() - for item, position_info in items.items(): - item.set_write_position(position_info[0], timestamp=timestamp, - propagate=0) - if not self._simulation_mode: - self.motion.run(items=items) |