#!/usr/bin/env python
#############################################################################
##
# This file is part of Taurus
##
# http://taurus-scada.org
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus. If not, see .
##
#############################################################################
"""Extension of :mod:`guiqwt.curve`"""
from __future__ import print_function
from builtins import next
from taurus.external.qt import Qt
from taurus.qt.qtgui.base import TaurusBaseComponent
from taurus.qt.qtcore.util.signal import baseSignal
import taurus
from guiqwt.curve import CurveItem
from taurus.qt.qtgui.extra_guiqwt.styles import TaurusCurveParam, TaurusTrendParam
from taurus.core.util.containers import ArrayBuffer
import numpy
__all__ = ["TaurusCurveItem"]
class TaurusCurveItem(CurveItem, TaurusBaseComponent):
'''A CurveItem that autoupdates its values & params when x or y components change'''
dataChanged = baseSignal('dataChanged')
def __init__(self, curveparam=None, taurusparam=None):
CurveItem.__init__(self, curveparam=curveparam)
TaurusBaseComponent.__init__(self, self.__class__.__name__)
# I need to do this because I am not using the standard model attach
# mechanism
self.taurusEvent.connect(self.filterEvent)
self._xcomp = None
self._ycomp = None
if taurusparam is None:
taurusparam = TaurusCurveParam()
self.taurusparam = taurusparam
def setModels(self, x, y):
# create/get new components
if x is None:
newX = None
else:
newX = taurus.Attribute(x)
newY = taurus.Attribute(y)
# stop listening to previous components (if they are not the same as
# the new)
if self._xcomp is not None and self._xcomp is not newX:
self._xcomp.removeListener(self)
self._xcomp = newX
if self._ycomp is not None and self._ycomp is not newY:
self._ycomp.removeListener(self)
self._ycomp = newY
# start listening to new components
if self._xcomp is not None:
self._xcomp.addListener(self)
self._ycomp.addListener(self)
self.onCurveDataChanged()
self.taurusparam.xModel = x
self.taurusparam.yModel = y
def getModels(self):
return self.taurusparam.xModel, self.taurusparam.yModel
def handleEvent(self, evt_src, ect_type, evt_value):
if evt_value is None or getattr(evt_value, 'rvalue', None) is None:
self.debug('Ignoring event from %s' % repr(evt_src))
return
if evt_src is self._xcomp or evt_src is self._ycomp:
self.onCurveDataChanged()
self.dataChanged.emit()
def onCurveDataChanged(self):
# TODO: Take units into account for displaying curves, axis, etc.
try:
if self._ycomp.isNumeric():
yvalue = self._ycomp.read().rvalue.magnitude
else:
yvalue = self._ycomp.read().rvalue
except:
yvalue = None
if yvalue is None:
return
# TODO: Take units into account for displaying curves, axis, etc.
try:
if self._xcomp.isNumeric():
xvalue = self._xcomp.read().rvalue.magnitude
else:
xvalue = self._xcomp.read().rvalue
except:
xvalue = None
if xvalue is None:
xvalue = numpy.arange(len(yvalue))
self.set_data(xvalue, yvalue)
p = self.plot()
if p is not None:
p.replot()
def get_item_parameters(self, itemparams):
CurveItem.get_item_parameters(self, itemparams)
itemparams.add("TaurusParam", self, self.taurusparam)
def updateTaurusParams(self):
self.taurusparam.update_curve(self)
def set_item_parameters(self, itemparams):
CurveItem.set_item_parameters(self, itemparams)
self.updateTaurusParams()
class TaurusTrendItem(CurveItem, TaurusBaseComponent):
'''A CurveItem that listens to events from a Taurus scalar attribute and appends new values to it'''
dataChanged = baseSignal('dataChanged')
scrollRequested = baseSignal('scrollRequested', object, object, object)
def __init__(self, curveparam=None, taurusparam=None):
CurveItem.__init__(self, curveparam=curveparam)
TaurusBaseComponent.__init__(self, self.__class__.__name__)
self.__xBuffer = None
self.__yBuffer = None
self.__timeOffset = None
if taurusparam is None:
taurusparam = TaurusTrendParam()
self.taurusparam = taurusparam
self.updateTaurusParams()
def setBufferSize(self, buffersize):
'''sets the size of the stack.
:param buffersize: (int) size of the stack
'''
self.taurusparam.maxBufferSize = buffersize
try:
if self.__xBuffer is not None:
self.__xBuffer.setMaxSize(buffersize)
if self.__yBuffer is not None:
self.__yBuffer.setMaxSize(buffersize)
except ValueError:
self.info(
'buffer downsizing requested. Current contents will be discarded')
self.__xBuffer = None
self.__yBuffer = None
def setModel(self, model):
# do the standard stuff
TaurusBaseComponent.setModel(self, model)
# update the taurusparam
self.taurusparam.model = self.getModelName()
#... and fire a fake event for initialization
try:
value = self.getModelObj().read()
self.fireEvent(
self, taurus.core.taurusbasetypes.TaurusEventType.Change, value)
except:
pass
def handleEvent(self, evt_src, evt_type, evt_value):
if evt_value is None or getattr(evt_value, 'rvalue', None) is None:
self.debug('Ignoring event from %s' % repr(evt_src))
return
plot = self.plot()
# initialization\
if self.__xBuffer is None:
self.__xBuffer = ArrayBuffer(numpy.zeros(min(
128, self.taurusparam.maxBufferSize), dtype='d'), maxSize=self.taurusparam.maxBufferSize)
if self.__yBuffer is None:
self.__yBuffer = ArrayBuffer(numpy.zeros(min(
128, self.taurusparam.maxBufferSize), dtype='d'), maxSize=self.taurusparam.maxBufferSize)
# update x values
if self.taurusparam.stackMode == 'datetime':
if self.__timeOffset is None:
self.__timeOffset = evt_value.time.totime()
if plot is not None:
plot.set_axis_title('bottom', 'Time')
plot.set_axis_unit('bottom', '')
self.__xBuffer.append(evt_value.time.totime())
elif self.taurusparam.stackMode == 'deltatime':
try:
self.__xBuffer.append(
evt_value.time.totime() - self.__timeOffset)
except TypeError: # this will happen if self.__timeOffset has not been initialized
self.__timeOffset = evt_value.time.totime()
self.__xBuffer.append(0)
if plot is not None:
plot.set_axis_title('bottom', 'Time since %s' %
evt_value.time.isoformat())
plot.set_axis_unit('bottom', '')
else:
try:
# +numpy.random.randint(0,4) #for debugging we can put a variable step
step = 1
self.__xBuffer.append(self.__xBuffer[-1] + step)
except IndexError: # this will happen when the x buffer is empty
self.__xBuffer.append(0)
if plot is not None:
plot.set_axis_title('bottom', 'Event #')
plot.set_axis_unit('bottom', '')
# update y
# TODO: Take units into account for displaying curves, axis, etc.
if self.__yBuffer.isNumeric():
self.__yBuffer.append(evt_value.rvalue.magnitude)
else:
self.__yBuffer.append(evt_value.rvalue)
# update the plot data
x, y = self.__xBuffer.contents(), self.__yBuffer.contents()
self.set_data(x, y)
# signal data changed and replot
self.dataChanged.emit()
if plot is not None:
value = x[-1]
axis = self.xAxis()
xmin, xmax = plot.get_axis_limits(axis)
if value > xmax or value < xmin:
self.scrollRequested.emit(plot, axis, value)
plot.replot()
def get_item_parameters(self, itemparams):
CurveItem.get_item_parameters(self, itemparams)
itemparams.add("TaurusParam", self, self.taurusparam)
def updateTaurusParams(self):
self.taurusparam.update_curve(self)
def set_item_parameters(self, itemparams):
CurveItem.set_item_parameters(self, itemparams)
self.updateTaurusParams()
def taurusTrendMain():
from taurus.qt.qtgui.extra_guiqwt.builder import make
from taurus.qt.qtgui.application import TaurusApplication
from guiqwt.plot import CurveDialog
from guiqwt.tools import HRangeTool
import taurus.core.util.argparse
import sys
parser = taurus.core.util.argparse.get_taurus_parser()
parser.set_usage("%prog [options] [ [] ...]")
parser.set_description("a taurus application for plotting 1D data sets")
parser.add_option("-x", "--x-axis-mode", dest="x_axis_mode", default='d', metavar="t|d|e",
help="interpret X values as timestamps (t), time deltas (d) or event numbers (e). Accepted values: t|d|e")
parser.add_option("-b", "--buffer", dest="max_buffer_size", default='16384',
help="maximum number of values to be plotted (when reached, the oldest values will be discarded)")
parser.add_option("-a", "--use-archiving",
action="store_true", dest="use_archiving", default=False)
parser.add_option("--demo", action="store_true", dest="demo",
default=False, help="show a demo of the widget")
app = TaurusApplication(
cmd_line_parser=parser, app_name="taurusplot2", app_version=taurus.Release.version)
args = app.get_command_line_args()
options = app.get_command_line_options()
# check & process options
stackModeMap = dict(t='datetime', d='deltatime', e='event')
if options.x_axis_mode.lower() not in stackModeMap:
parser.print_help(sys.stderr)
sys.exit(1)
stackMode = stackModeMap[options.x_axis_mode.lower()]
if options.demo:
args.append('eval:rand()')
w = CurveDialog(edit=False, toolbar=True, wintitle="Taurus Trend")
# set archiving
if options.use_archiving:
raise NotImplementedError('Archiving support is not yet implemented')
w.setUseArchiving(True)
w.add_tool(HRangeTool)
# w.add_tool(TaurusCurveChooserTool)
# w.add_tool(TimeAxisTool)
if len(args) == 0:
parser.print_help(sys.stderr)
sys.exit(1)
plot = w.get_plot()
for a in args:
item = TaurusTrendItem(stackMode=stackMode,
buffersize=int(options.max_buffer_size))
plot.add_item(item)
item.setModel(a)
w.show()
sys.exit(app.exec_())
def taurusCurveMain():
from taurus.qt.qtgui.extra_guiqwt.builder import make
from taurus.qt.qtgui.application import TaurusApplication
from guiqwt.plot import CurveDialog
from guiqwt.tools import HRangeTool
from taurus.qt.qtgui.extra_guiqwt.tools import TaurusCurveChooserTool, TimeAxisTool
import taurus.core.util.argparse
import sys
parser = taurus.core.util.argparse.get_taurus_parser()
parser.set_usage("%prog [options] [ [] ...]")
parser.set_description("a taurus application for plotting 1D data sets")
app = TaurusApplication(
cmd_line_parser=parser, app_name="taurusplot2", app_version=taurus.Release.version)
args = app.get_command_line_args()
win = CurveDialog(edit=False, toolbar=True, wintitle="TaurusPlot2",
options=dict(title="", xlabel="xlabel", ylabel="ylabel"))
win.add_tool(HRangeTool)
win.add_tool(TaurusCurveChooserTool)
win.add_tool(TimeAxisTool)
plot = win.get_plot()
for a in args:
mx_my = a.split('|')
n = len(mx_my)
if n == 1:
mx, my = None, mx_my[0]
elif n == 2:
mx, my = mx_my
else:
print("Invalid model: %s\n" % mx_my)
parser.print_help(sys.stderr)
sys.exit(1)
# cycle colors
style = next(make.style)
color = style[0]
linestyle = style[1:]
plot.add_item(make.curve(mx, my, color=color,
linestyle=linestyle, linewidth=2))
win.get_itemlist_panel().show()
plot.set_items_readonly(False)
win.show()
win.exec_()
if __name__ == "__main__":
# test1()
# taurusCurveMain()
taurusTrendMain()