summaryrefslogtreecommitdiff
path: root/tango
diff options
context:
space:
mode:
authorBodo-Merle Sandor <sbodomerle@gmail.com>2018-06-02 14:35:03 +0200
committerBodo-Merle Sandor <sbodomerle@gmail.com>2018-06-02 14:35:03 +0200
commit5bab4440d34980f3722a8e784b6a72804acf5c72 (patch)
tree88360d94c9cc7516d3494f30b2c773d0f9a44a1d /tango
parentc5e62c5a005860459d6ec6b8d104347003e981e9 (diff)
New upstream version 9.2.3
Diffstat (limited to 'tango')
-rw-r--r--tango/asyncio_executor.py8
-rw-r--r--tango/attribute_proxy.py2
-rw-r--r--tango/databaseds/database.py49
-rw-r--r--tango/device_proxy.py37
-rw-r--r--tango/futures_executor.py1
-rw-r--r--tango/gevent_executor.py76
-rw-r--r--tango/green.py19
-rw-r--r--tango/release.py4
-rw-r--r--tango/tango_object.py4
9 files changed, 104 insertions, 96 deletions
diff --git a/tango/asyncio_executor.py b/tango/asyncio_executor.py
index 5eb0560..b1bdf5e 100644
--- a/tango/asyncio_executor.py
+++ b/tango/asyncio_executor.py
@@ -15,11 +15,6 @@ from __future__ import absolute_import
# Imports
import functools
-try:
- from threading import get_ident
-except:
- from threading import _get_ident as get_ident
-
# Asyncio imports
try:
import asyncio
@@ -65,6 +60,7 @@ class AsyncioExecutor(AbstractExecutor):
default_wait = False
def __init__(self, loop=None, subexecutor=None):
+ super(AsyncioExecutor, self).__init__()
if loop is None:
try:
loop = asyncio.get_event_loop()
@@ -94,7 +90,7 @@ class AsyncioExecutor(AbstractExecutor):
def execute(self, fn, *args, **kwargs):
"""Execute an operation and return the result."""
- if self.loop._thread_id == get_ident():
+ if self.in_executor_context():
corofn = asyncio.coroutine(lambda: fn(*args, **kwargs))
return corofn()
future = self.submit(fn, *args, **kwargs)
diff --git a/tango/attribute_proxy.py b/tango/attribute_proxy.py
index e69c9be..fda21a4 100644
--- a/tango/attribute_proxy.py
+++ b/tango/attribute_proxy.py
@@ -29,7 +29,7 @@ from .device_proxy import __init_device_proxy_internals as init_device_proxy
__all__ = ["AttributeProxy", "attribute_proxy_init", "get_attribute_proxy"]
-@green(consume_green_mode=True)
+@green(consume_green_mode=False)
def get_attribute_proxy(*args, **kwargs):
"""
get_attribute_proxy(self, full_attr_name, green_mode=None, wait=True, timeout=True) -> AttributeProxy
diff --git a/tango/databaseds/database.py b/tango/databaseds/database.py
index 9c4c7f3..6d6aa53 100644
--- a/tango/databaseds/database.py
+++ b/tango/databaseds/database.py
@@ -16,14 +16,11 @@ except ImportError:
import tango
from tango import AttrWriteType, GreenMode
-from tango.server import Device, DeviceMeta
+from tango.server import Device
from tango.server import attribute, command
-from tango.server import class_property
-from tango.server import device_property
from tango.server import run
-from tango.globals import get_class, get_class_by_class, \
- get_constructed_class_by_class
+from tango.globals import get_class_by_class, get_constructed_class_by_class
READ_ONLY = AttrWriteType.READ
@@ -31,10 +28,12 @@ WRITE_ONLY = AttrWriteType.WRITE
READ_WRITE = AttrWriteType.READ_WRITE
READ_WITH_WRITE = AttrWriteType.READ_WITH_WRITE
-#Argument Options
+# Argument Options
global options
global WILDCARD_REPLACEMENT
WILDCARD_REPLACEMENT = True
+
+
class DbInter(tango.Interceptors):
def create_thread(self):
@@ -43,12 +42,15 @@ class DbInter(tango.Interceptors):
def delete_thread(self):
pass
+
DB_NAME = None
+
def set_db_name(db_name):
global DB_NAME
DB_NAME = db_name
+
def get_db_name():
return DB_NAME
@@ -59,6 +61,7 @@ th_exc = tango.Except.throw_exception
from .db_errors import *
+
def check_device_name(dev_name):
if '*' in dev_name:
return False, None, None
@@ -69,7 +72,7 @@ def check_device_name(dev_name):
dev_name = dev_name[5:]
if dev_name.startswith("//"):
dev_name = dev_name[2:]
- if not '/' in dev_name or dev_name.startswith("/"):
+ if '/' not in dev_name or dev_name.startswith("/"):
return False, None, None
dfm = dev_name.split("/")
if len(dfm) != 3:
@@ -79,6 +82,7 @@ def check_device_name(dev_name):
return False, None, None
return True, dev_name, dfm
+
def replace_wildcard(text):
if not WILDCARD_REPLACEMENT:
return text
@@ -94,6 +98,7 @@ def replace_wildcard(text):
text = text.replace("*", "%")
return text
+
class TimeStructure:
def __init__(self):
self.average = 0
@@ -104,8 +109,10 @@ class TimeStructure:
self.calls = 0
self.index = ''
+
def stats(f):
fname = f.__name__
+
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
start = time.time()
@@ -116,6 +123,7 @@ def stats(f):
update_timing_stats(self, start, end, fname)
return wrapper
+
def update_timing_stats(dev, time_before, time_after, cmd_name):
tmp_time = dev.timing_maps[cmd_name]
time_elapsed = (time_after - time_before) * 1000.
@@ -125,7 +133,12 @@ def update_timing_stats(dev, time_before, time_after, cmd_name):
if time_elapsed < tmp_time.minimum or tmp_time.minimum == 0:
tmp_time.minimum = time_elapsed
tmp_time.calls = tmp_time.calls + 1
- tmp_time.average = tmp_time.total_elapsed/tmp_time.calls
+ tmp_time.average = tmp_time.total_elapsed / tmp_time.calls
+
+
+def get_plugin(name):
+ fullname = '%s.%s' % (db_access.__package__, name)
+ return __import__(fullname, None, None, fullname)
class DataBase(Device):
@@ -135,27 +148,26 @@ class DataBase(Device):
# --- attributes ---------------------------------------
- Timing_maximum = attribute(dtype=('float64',),max_dim_x=128, access=READ_ONLY)
+ Timing_maximum = attribute(dtype=('float64',), max_dim_x=128, access=READ_ONLY)
- Timing_average = attribute(dtype=('float64',),max_dim_x=128, access=READ_ONLY)
+ Timing_average = attribute(dtype=('float64',), max_dim_x=128, access=READ_ONLY)
- Timing_index = attribute(dtype=('str',),max_dim_x=128, access=READ_ONLY)
+ Timing_index = attribute(dtype=('str',), max_dim_x=128, access=READ_ONLY)
- Timing_calls = attribute(dtype=('float64',),max_dim_x=128, access=READ_ONLY)
+ Timing_calls = attribute(dtype=('float64',), max_dim_x=128, access=READ_ONLY)
- Timing_info = attribute(dtype=('str',),max_dim_x=128, access=READ_ONLY)
+ Timing_info = attribute(dtype=('str',), max_dim_x=128, access=READ_ONLY)
StoredProcedureRelease = attribute(dtype='str', access=READ_ONLY)
- Timing_minimum = attribute(dtype=('float64',),max_dim_x=128, access=READ_ONLY)
+ Timing_minimum = attribute(dtype=('float64',), max_dim_x=128, access=READ_ONLY)
def init_device(self):
- self._log = log = logging.getLogger(self.get_name())
+ self._log = logging.getLogger(self.get_name())
self._log.debug("In init_device()")
self.attr_StoredProcedureRelease_read = ''
self.init_timing_stats()
- m = __import__('%s.%s' % (db_access.__package__,options.db_access),None,None,
- '%s.%s' % (db_access.__package__,options.db_access))
+ m = get_plugin(options.db_access)
self.db = m.get_db(personal_name = options.argv[1])
try:
global WILDCARD_REPLACEMENT
@@ -1749,6 +1761,9 @@ def main(argv = None):
(options,args) = parser.parse_args(argv)
options.argv = ["DataBaseds"] + args
+ # Check plugin availability
+ get_plugin(options.db_access)
+
port = options.port
if port is None:
try:
diff --git a/tango/device_proxy.py b/tango/device_proxy.py
index 4a90c35..c8a05b1 100644
--- a/tango/device_proxy.py
+++ b/tango/device_proxy.py
@@ -1563,39 +1563,40 @@ def __init_DeviceProxy():
DeviceProxy.write_read_attribute = green(__DeviceProxy__write_read_attribute)
DeviceProxy.write_read_attributes = green(__DeviceProxy__write_read_attributes)
- DeviceProxy.read_attributes_asynch = __DeviceProxy__read_attributes_asynch
- DeviceProxy.read_attribute_asynch = __DeviceProxy__read_attribute_asynch
- DeviceProxy.read_attribute_reply = __DeviceProxy__read_attribute_reply
- DeviceProxy.write_attributes_asynch = __DeviceProxy__write_attributes_asynch
- DeviceProxy.write_attribute_asynch = __DeviceProxy__write_attribute_asynch
- DeviceProxy.write_attribute_reply = __DeviceProxy__write_attribute_reply
+ DeviceProxy.read_attributes_asynch = green(__DeviceProxy__read_attributes_asynch)
+ DeviceProxy.read_attribute_asynch = green(__DeviceProxy__read_attribute_asynch)
+ DeviceProxy.read_attribute_reply = green(__DeviceProxy__read_attribute_reply)
+ DeviceProxy.write_attributes_asynch = green(__DeviceProxy__write_attributes_asynch)
+ DeviceProxy.write_attribute_asynch = green(__DeviceProxy__write_attribute_asynch)
+ DeviceProxy.write_attribute_reply = green(__DeviceProxy__write_attribute_reply)
DeviceProxy.read_pipe = green(__DeviceProxy__read_pipe)
DeviceProxy.write_pipe = green(__DeviceProxy__write_pipe)
- DeviceProxy.get_property = __DeviceProxy__get_property
- DeviceProxy.put_property = __DeviceProxy__put_property
- DeviceProxy.delete_property = __DeviceProxy__delete_property
- DeviceProxy.get_property_list = __DeviceProxy__get_property_list
- DeviceProxy.get_attribute_config = __DeviceProxy__get_attribute_config
- DeviceProxy.get_attribute_config_ex = __DeviceProxy__get_attribute_config_ex
- DeviceProxy.set_attribute_config = __DeviceProxy__set_attribute_config
+ DeviceProxy.get_property = green(__DeviceProxy__get_property)
+ DeviceProxy.put_property = green(__DeviceProxy__put_property)
+ DeviceProxy.delete_property = green(__DeviceProxy__delete_property)
+ DeviceProxy.get_property_list = green(__DeviceProxy__get_property_list)
+ DeviceProxy.get_attribute_config = green(__DeviceProxy__get_attribute_config)
+ DeviceProxy.get_attribute_config_ex = green(__DeviceProxy__get_attribute_config_ex)
+ DeviceProxy.set_attribute_config = green(__DeviceProxy__set_attribute_config)
- DeviceProxy.get_command_config = __DeviceProxy__get_command_config
+ DeviceProxy.get_command_config = green(__DeviceProxy__get_command_config)
- DeviceProxy.get_pipe_config = __DeviceProxy__get_pipe_config
- DeviceProxy.set_pipe_config = __DeviceProxy__set_pipe_config
+ DeviceProxy.get_pipe_config = green(__DeviceProxy__get_pipe_config)
+ DeviceProxy.set_pipe_config = green(__DeviceProxy__set_pipe_config)
DeviceProxy.__get_event_map = __DeviceProxy__get_event_map
DeviceProxy.__get_event_map_lock = __DeviceProxy__get_event_map_lock
+
DeviceProxy.subscribe_event = green(
__DeviceProxy__subscribe_event, consume_green_mode=False)
DeviceProxy.unsubscribe_event = green(__DeviceProxy__unsubscribe_event)
- DeviceProxy.__unsubscribe_event_all = __DeviceProxy__unsubscribe_event_all
DeviceProxy.get_events = __DeviceProxy__get_events
+ DeviceProxy.__unsubscribe_event_all = __DeviceProxy__unsubscribe_event_all
+
DeviceProxy.__str__ = __DeviceProxy__str
DeviceProxy.__repr__ = __DeviceProxy__str
-
DeviceProxy._get_info_ = __DeviceProxy___get_info_
diff --git a/tango/futures_executor.py b/tango/futures_executor.py
index a6dedc2..f7c2978 100644
--- a/tango/futures_executor.py
+++ b/tango/futures_executor.py
@@ -46,6 +46,7 @@ class FuturesExecutor(AbstractExecutor):
default_wait = True
def __init__(self, process=False, max_workers=20):
+ super(FuturesExecutor, self).__init__()
cls = ProcessPoolExecutor if process else ThreadPoolExecutor
self.subexecutor = cls(max_workers=max_workers)
diff --git a/tango/gevent_executor.py b/tango/gevent_executor.py
index 02368f1..0ea2b56 100644
--- a/tango/gevent_executor.py
+++ b/tango/gevent_executor.py
@@ -15,27 +15,26 @@ from __future__ import absolute_import
# Imports
import sys
import six
-import types
import functools
-# Combatibility imports
-try:
- from threading import get_ident
-except:
- from threading import _get_ident as get_ident
-
# Gevent imports
import gevent.queue
+import gevent.monkey
+import gevent.threadpool
+
+# Bypass gevent monkey patching
+ThreadSafeEvent = gevent.monkey.get_original('threading', 'Event')
# Tango imports
from .green import AbstractExecutor
+
__all__ = ["get_global_executor", "set_global_executor", "GeventExecutor"]
# Global executor
_EXECUTOR = None
-
+_THREAD_POOL = None
def get_global_executor():
global _EXECUTOR
@@ -49,18 +48,11 @@ def set_global_executor(executor):
_EXECUTOR = executor
-# Patch for gevent threadpool
-
def get_global_threadpool():
- """Before gevent-1.1.0, patch the spawn method to propagate exception
- raised in the loop to the AsyncResult.
- """
- threadpool = gevent.get_hub().threadpool
- if gevent.version_info < (1, 1) and not hasattr(threadpool, '_spawn'):
- threadpool._spawn = threadpool.spawn
- threadpool.spawn = types.MethodType(
- spawn, threadpool, type(threadpool))
- return threadpool
+ global _THREAD_POOL
+ if _THREAD_POOL is None:
+ _THREAD_POOL = ThreadPool(maxsize=10**4)
+ return _THREAD_POOL
class ExceptionWrapper:
@@ -70,7 +62,7 @@ class ExceptionWrapper:
self.tb = tb
-def wrap_errors(func):
+def wrap_error(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
@@ -81,34 +73,32 @@ def wrap_errors(func):
return wrapper
-def get_with_exception(result, block=True, timeout=None):
- result = result._get(block, timeout)
+def unwrap_error(source):
+ result = source.get()
if isinstance(result, ExceptionWrapper):
# Raise the exception using the caller context
six.reraise(result.exception, result.error_string, result.tb)
return result
-def spawn(threadpool, fn, *args, **kwargs):
- # The gevent threadpool do not raise exception with async results,
- # we have to wrap it
- fn = wrap_errors(fn)
- result = threadpool._spawn(fn, *args, **kwargs)
- result._get = result.get
- result.get = types.MethodType(get_with_exception, result, type(result))
- return result
+class ThreadPool(gevent.threadpool.ThreadPool):
+
+ def spawn(self, fn, *args, **kwargs):
+ fn = wrap_error(fn)
+ fn_result = super(ThreadPool, self).spawn(fn, *args, **kwargs)
+ return gevent.spawn(unwrap_error, fn_result)
# Gevent task and event loop
class GeventTask:
- def __init__(self, event, func, *args, **kwargs):
- self.event = event
+ def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
self.value = None
self.exception = None
+ self.event = ThreadSafeEvent()
def run(self):
try:
@@ -130,22 +120,13 @@ class GeventTask:
class GeventLoop:
def __init__(self):
- self.thread_id = get_ident()
- self.tasks = gevent.queue.Queue()
- self.loop = gevent.spawn(self.run)
-
- def run(self):
- while True:
- self.tasks.get().spawn()
-
- def is_gevent_thread(self):
- return self.thread_id == get_ident()
+ self.hub = gevent.get_hub()
def submit(self, func, *args, **kwargs):
- event = gevent._threading.Event()
- task = GeventTask(event, func, *args, **kwargs)
- self.tasks.put_nowait(task)
- self.tasks.hub.loop.async().send()
+ task = GeventTask(func, *args, **kwargs)
+ watcher = self.hub.loop.async()
+ watcher.start(task.spawn)
+ watcher.send()
return task
@@ -158,6 +139,7 @@ class GeventExecutor(AbstractExecutor):
default_wait = True
def __init__(self, loop=None, subexecutor=None):
+ super(GeventExecutor, self).__init__()
if loop is None:
loop = GeventLoop()
if subexecutor is None:
@@ -178,7 +160,7 @@ class GeventExecutor(AbstractExecutor):
def execute(self, fn, *args, **kwargs):
"""Execute an operation and return the result."""
- if self.loop.is_gevent_thread():
+ if self.in_executor_context():
return fn(*args, **kwargs)
task = self.submit(fn, *args, **kwargs)
return task.result()
diff --git a/tango/green.py b/tango/green.py
index 020c4d7..2bd4033 100644
--- a/tango/green.py
+++ b/tango/green.py
@@ -14,6 +14,12 @@
import os
from functools import wraps
+# Compatibility imports
+try:
+ from threading import get_ident
+except:
+ from threading import _get_ident as get_ident
+
# Tango imports
from ._tango import GreenMode
@@ -62,6 +68,12 @@ class AbstractExecutor(object):
asynchronous = NotImplemented
default_wait = NotImplemented
+ def __init__(self):
+ self.thread_id = get_ident()
+
+ def in_executor_context(self):
+ return self.thread_id == get_ident()
+
def delegate(self, fn, *args, **kwargs):
"""Delegate an operation and return an accessor."""
if not self.asynchronous:
@@ -89,10 +101,11 @@ class AbstractExecutor(object):
def run(self, fn, args=(), kwargs={}, wait=None, timeout=None):
if wait is None:
wait = self.default_wait
+ # Wait and timeout are not supported in synchronous mode
+ if not self.asynchronous and (not wait or timeout):
+ raise ValueError('Not supported in synchronous mode')
# Sychronous (no delegation)
- if not self.asynchronous:
- if not wait or timeout:
- raise ValueError('Not supported in synchronous mode')
+ if not self.asynchronous or not self.in_executor_context():
return fn(*args, **kwargs)
# Asynchronous delegation
accessor = self.delegate(fn, *args, **kwargs)
diff --git a/tango/release.py b/tango/release.py
index 48cba49..83869be 100644
--- a/tango/release.py
+++ b/tango/release.py
@@ -44,8 +44,8 @@ class Release:
- keywords: (seq<str>) list of keywords
- license: (str) the license
"""
- name = 'PyTango'
- version_info = (9, 2, 2)
+ name = 'pytango'
+ version_info = (9, 2, 3)
version = '.'.join(map(str, version_info[:3]))
release = ''.join(map(str, version_info[3:]))
separator = '.' if 'dev' in release or 'post' in release else ''
diff --git a/tango/tango_object.py b/tango/tango_object.py
index 825b1f4..7cac855 100644
--- a/tango/tango_object.py
+++ b/tango/tango_object.py
@@ -13,7 +13,7 @@
import os
import sys
-import types
+import six
import logging
import weakref
import inspect
@@ -155,7 +155,7 @@ def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
if doc is None:
doc = ""
cmd.__doc__ = doc
- cmd = types.MethodType(cmd, None, DeviceDispatcher)
+ cmd = six.create_unbound_method(cmd, DeviceDispatcher)
setattr(DeviceDispatcher, name, cmd)
DeviceDispatcherClass.cmd_list[name] = \
[[in_type, doc], [out_type, ""]]