diff options
author | Bodo-Merle Sandor <sbodomerle@gmail.com> | 2018-06-02 14:35:03 +0200 |
---|---|---|
committer | Bodo-Merle Sandor <sbodomerle@gmail.com> | 2018-06-02 14:35:03 +0200 |
commit | 5bab4440d34980f3722a8e784b6a72804acf5c72 (patch) | |
tree | 88360d94c9cc7516d3494f30b2c773d0f9a44a1d /tango | |
parent | c5e62c5a005860459d6ec6b8d104347003e981e9 (diff) |
New upstream version 9.2.3
Diffstat (limited to 'tango')
-rw-r--r-- | tango/asyncio_executor.py | 8 | ||||
-rw-r--r-- | tango/attribute_proxy.py | 2 | ||||
-rw-r--r-- | tango/databaseds/database.py | 49 | ||||
-rw-r--r-- | tango/device_proxy.py | 37 | ||||
-rw-r--r-- | tango/futures_executor.py | 1 | ||||
-rw-r--r-- | tango/gevent_executor.py | 76 | ||||
-rw-r--r-- | tango/green.py | 19 | ||||
-rw-r--r-- | tango/release.py | 4 | ||||
-rw-r--r-- | tango/tango_object.py | 4 |
9 files changed, 104 insertions, 96 deletions
diff --git a/tango/asyncio_executor.py b/tango/asyncio_executor.py index 5eb0560..b1bdf5e 100644 --- a/tango/asyncio_executor.py +++ b/tango/asyncio_executor.py @@ -15,11 +15,6 @@ from __future__ import absolute_import # Imports import functools -try: - from threading import get_ident -except: - from threading import _get_ident as get_ident - # Asyncio imports try: import asyncio @@ -65,6 +60,7 @@ class AsyncioExecutor(AbstractExecutor): default_wait = False def __init__(self, loop=None, subexecutor=None): + super(AsyncioExecutor, self).__init__() if loop is None: try: loop = asyncio.get_event_loop() @@ -94,7 +90,7 @@ class AsyncioExecutor(AbstractExecutor): def execute(self, fn, *args, **kwargs): """Execute an operation and return the result.""" - if self.loop._thread_id == get_ident(): + if self.in_executor_context(): corofn = asyncio.coroutine(lambda: fn(*args, **kwargs)) return corofn() future = self.submit(fn, *args, **kwargs) diff --git a/tango/attribute_proxy.py b/tango/attribute_proxy.py index e69c9be..fda21a4 100644 --- a/tango/attribute_proxy.py +++ b/tango/attribute_proxy.py @@ -29,7 +29,7 @@ from .device_proxy import __init_device_proxy_internals as init_device_proxy __all__ = ["AttributeProxy", "attribute_proxy_init", "get_attribute_proxy"] -@green(consume_green_mode=True) +@green(consume_green_mode=False) def get_attribute_proxy(*args, **kwargs): """ get_attribute_proxy(self, full_attr_name, green_mode=None, wait=True, timeout=True) -> AttributeProxy diff --git a/tango/databaseds/database.py b/tango/databaseds/database.py index 9c4c7f3..6d6aa53 100644 --- a/tango/databaseds/database.py +++ b/tango/databaseds/database.py @@ -16,14 +16,11 @@ except ImportError: import tango from tango import AttrWriteType, GreenMode -from tango.server import Device, DeviceMeta +from tango.server import Device from tango.server import attribute, command -from tango.server import class_property -from tango.server import device_property from tango.server import run -from tango.globals import get_class, get_class_by_class, \ - get_constructed_class_by_class +from tango.globals import get_class_by_class, get_constructed_class_by_class READ_ONLY = AttrWriteType.READ @@ -31,10 +28,12 @@ WRITE_ONLY = AttrWriteType.WRITE READ_WRITE = AttrWriteType.READ_WRITE READ_WITH_WRITE = AttrWriteType.READ_WITH_WRITE -#Argument Options +# Argument Options global options global WILDCARD_REPLACEMENT WILDCARD_REPLACEMENT = True + + class DbInter(tango.Interceptors): def create_thread(self): @@ -43,12 +42,15 @@ class DbInter(tango.Interceptors): def delete_thread(self): pass + DB_NAME = None + def set_db_name(db_name): global DB_NAME DB_NAME = db_name + def get_db_name(): return DB_NAME @@ -59,6 +61,7 @@ th_exc = tango.Except.throw_exception from .db_errors import * + def check_device_name(dev_name): if '*' in dev_name: return False, None, None @@ -69,7 +72,7 @@ def check_device_name(dev_name): dev_name = dev_name[5:] if dev_name.startswith("//"): dev_name = dev_name[2:] - if not '/' in dev_name or dev_name.startswith("/"): + if '/' not in dev_name or dev_name.startswith("/"): return False, None, None dfm = dev_name.split("/") if len(dfm) != 3: @@ -79,6 +82,7 @@ def check_device_name(dev_name): return False, None, None return True, dev_name, dfm + def replace_wildcard(text): if not WILDCARD_REPLACEMENT: return text @@ -94,6 +98,7 @@ def replace_wildcard(text): text = text.replace("*", "%") return text + class TimeStructure: def __init__(self): self.average = 0 @@ -104,8 +109,10 @@ class TimeStructure: self.calls = 0 self.index = '' + def stats(f): fname = f.__name__ + @functools.wraps(f) def wrapper(self, *args, **kwargs): start = time.time() @@ -116,6 +123,7 @@ def stats(f): update_timing_stats(self, start, end, fname) return wrapper + def update_timing_stats(dev, time_before, time_after, cmd_name): tmp_time = dev.timing_maps[cmd_name] time_elapsed = (time_after - time_before) * 1000. @@ -125,7 +133,12 @@ def update_timing_stats(dev, time_before, time_after, cmd_name): if time_elapsed < tmp_time.minimum or tmp_time.minimum == 0: tmp_time.minimum = time_elapsed tmp_time.calls = tmp_time.calls + 1 - tmp_time.average = tmp_time.total_elapsed/tmp_time.calls + tmp_time.average = tmp_time.total_elapsed / tmp_time.calls + + +def get_plugin(name): + fullname = '%s.%s' % (db_access.__package__, name) + return __import__(fullname, None, None, fullname) class DataBase(Device): @@ -135,27 +148,26 @@ class DataBase(Device): # --- attributes --------------------------------------- - Timing_maximum = attribute(dtype=('float64',),max_dim_x=128, access=READ_ONLY) + Timing_maximum = attribute(dtype=('float64',), max_dim_x=128, access=READ_ONLY) - Timing_average = attribute(dtype=('float64',),max_dim_x=128, access=READ_ONLY) + Timing_average = attribute(dtype=('float64',), max_dim_x=128, access=READ_ONLY) - Timing_index = attribute(dtype=('str',),max_dim_x=128, access=READ_ONLY) + Timing_index = attribute(dtype=('str',), max_dim_x=128, access=READ_ONLY) - Timing_calls = attribute(dtype=('float64',),max_dim_x=128, access=READ_ONLY) + Timing_calls = attribute(dtype=('float64',), max_dim_x=128, access=READ_ONLY) - Timing_info = attribute(dtype=('str',),max_dim_x=128, access=READ_ONLY) + Timing_info = attribute(dtype=('str',), max_dim_x=128, access=READ_ONLY) StoredProcedureRelease = attribute(dtype='str', access=READ_ONLY) - Timing_minimum = attribute(dtype=('float64',),max_dim_x=128, access=READ_ONLY) + Timing_minimum = attribute(dtype=('float64',), max_dim_x=128, access=READ_ONLY) def init_device(self): - self._log = log = logging.getLogger(self.get_name()) + self._log = logging.getLogger(self.get_name()) self._log.debug("In init_device()") self.attr_StoredProcedureRelease_read = '' self.init_timing_stats() - m = __import__('%s.%s' % (db_access.__package__,options.db_access),None,None, - '%s.%s' % (db_access.__package__,options.db_access)) + m = get_plugin(options.db_access) self.db = m.get_db(personal_name = options.argv[1]) try: global WILDCARD_REPLACEMENT @@ -1749,6 +1761,9 @@ def main(argv = None): (options,args) = parser.parse_args(argv) options.argv = ["DataBaseds"] + args + # Check plugin availability + get_plugin(options.db_access) + port = options.port if port is None: try: diff --git a/tango/device_proxy.py b/tango/device_proxy.py index 4a90c35..c8a05b1 100644 --- a/tango/device_proxy.py +++ b/tango/device_proxy.py @@ -1563,39 +1563,40 @@ def __init_DeviceProxy(): DeviceProxy.write_read_attribute = green(__DeviceProxy__write_read_attribute) DeviceProxy.write_read_attributes = green(__DeviceProxy__write_read_attributes) - DeviceProxy.read_attributes_asynch = __DeviceProxy__read_attributes_asynch - DeviceProxy.read_attribute_asynch = __DeviceProxy__read_attribute_asynch - DeviceProxy.read_attribute_reply = __DeviceProxy__read_attribute_reply - DeviceProxy.write_attributes_asynch = __DeviceProxy__write_attributes_asynch - DeviceProxy.write_attribute_asynch = __DeviceProxy__write_attribute_asynch - DeviceProxy.write_attribute_reply = __DeviceProxy__write_attribute_reply + DeviceProxy.read_attributes_asynch = green(__DeviceProxy__read_attributes_asynch) + DeviceProxy.read_attribute_asynch = green(__DeviceProxy__read_attribute_asynch) + DeviceProxy.read_attribute_reply = green(__DeviceProxy__read_attribute_reply) + DeviceProxy.write_attributes_asynch = green(__DeviceProxy__write_attributes_asynch) + DeviceProxy.write_attribute_asynch = green(__DeviceProxy__write_attribute_asynch) + DeviceProxy.write_attribute_reply = green(__DeviceProxy__write_attribute_reply) DeviceProxy.read_pipe = green(__DeviceProxy__read_pipe) DeviceProxy.write_pipe = green(__DeviceProxy__write_pipe) - DeviceProxy.get_property = __DeviceProxy__get_property - DeviceProxy.put_property = __DeviceProxy__put_property - DeviceProxy.delete_property = __DeviceProxy__delete_property - DeviceProxy.get_property_list = __DeviceProxy__get_property_list - DeviceProxy.get_attribute_config = __DeviceProxy__get_attribute_config - DeviceProxy.get_attribute_config_ex = __DeviceProxy__get_attribute_config_ex - DeviceProxy.set_attribute_config = __DeviceProxy__set_attribute_config + DeviceProxy.get_property = green(__DeviceProxy__get_property) + DeviceProxy.put_property = green(__DeviceProxy__put_property) + DeviceProxy.delete_property = green(__DeviceProxy__delete_property) + DeviceProxy.get_property_list = green(__DeviceProxy__get_property_list) + DeviceProxy.get_attribute_config = green(__DeviceProxy__get_attribute_config) + DeviceProxy.get_attribute_config_ex = green(__DeviceProxy__get_attribute_config_ex) + DeviceProxy.set_attribute_config = green(__DeviceProxy__set_attribute_config) - DeviceProxy.get_command_config = __DeviceProxy__get_command_config + DeviceProxy.get_command_config = green(__DeviceProxy__get_command_config) - DeviceProxy.get_pipe_config = __DeviceProxy__get_pipe_config - DeviceProxy.set_pipe_config = __DeviceProxy__set_pipe_config + DeviceProxy.get_pipe_config = green(__DeviceProxy__get_pipe_config) + DeviceProxy.set_pipe_config = green(__DeviceProxy__set_pipe_config) DeviceProxy.__get_event_map = __DeviceProxy__get_event_map DeviceProxy.__get_event_map_lock = __DeviceProxy__get_event_map_lock + DeviceProxy.subscribe_event = green( __DeviceProxy__subscribe_event, consume_green_mode=False) DeviceProxy.unsubscribe_event = green(__DeviceProxy__unsubscribe_event) - DeviceProxy.__unsubscribe_event_all = __DeviceProxy__unsubscribe_event_all DeviceProxy.get_events = __DeviceProxy__get_events + DeviceProxy.__unsubscribe_event_all = __DeviceProxy__unsubscribe_event_all + DeviceProxy.__str__ = __DeviceProxy__str DeviceProxy.__repr__ = __DeviceProxy__str - DeviceProxy._get_info_ = __DeviceProxy___get_info_ diff --git a/tango/futures_executor.py b/tango/futures_executor.py index a6dedc2..f7c2978 100644 --- a/tango/futures_executor.py +++ b/tango/futures_executor.py @@ -46,6 +46,7 @@ class FuturesExecutor(AbstractExecutor): default_wait = True def __init__(self, process=False, max_workers=20): + super(FuturesExecutor, self).__init__() cls = ProcessPoolExecutor if process else ThreadPoolExecutor self.subexecutor = cls(max_workers=max_workers) diff --git a/tango/gevent_executor.py b/tango/gevent_executor.py index 02368f1..0ea2b56 100644 --- a/tango/gevent_executor.py +++ b/tango/gevent_executor.py @@ -15,27 +15,26 @@ from __future__ import absolute_import # Imports import sys import six -import types import functools -# Combatibility imports -try: - from threading import get_ident -except: - from threading import _get_ident as get_ident - # Gevent imports import gevent.queue +import gevent.monkey +import gevent.threadpool + +# Bypass gevent monkey patching +ThreadSafeEvent = gevent.monkey.get_original('threading', 'Event') # Tango imports from .green import AbstractExecutor + __all__ = ["get_global_executor", "set_global_executor", "GeventExecutor"] # Global executor _EXECUTOR = None - +_THREAD_POOL = None def get_global_executor(): global _EXECUTOR @@ -49,18 +48,11 @@ def set_global_executor(executor): _EXECUTOR = executor -# Patch for gevent threadpool - def get_global_threadpool(): - """Before gevent-1.1.0, patch the spawn method to propagate exception - raised in the loop to the AsyncResult. - """ - threadpool = gevent.get_hub().threadpool - if gevent.version_info < (1, 1) and not hasattr(threadpool, '_spawn'): - threadpool._spawn = threadpool.spawn - threadpool.spawn = types.MethodType( - spawn, threadpool, type(threadpool)) - return threadpool + global _THREAD_POOL + if _THREAD_POOL is None: + _THREAD_POOL = ThreadPool(maxsize=10**4) + return _THREAD_POOL class ExceptionWrapper: @@ -70,7 +62,7 @@ class ExceptionWrapper: self.tb = tb -def wrap_errors(func): +def wrap_error(func): @functools.wraps(func) def wrapper(*args, **kwargs): try: @@ -81,34 +73,32 @@ def wrap_errors(func): return wrapper -def get_with_exception(result, block=True, timeout=None): - result = result._get(block, timeout) +def unwrap_error(source): + result = source.get() if isinstance(result, ExceptionWrapper): # Raise the exception using the caller context six.reraise(result.exception, result.error_string, result.tb) return result -def spawn(threadpool, fn, *args, **kwargs): - # The gevent threadpool do not raise exception with async results, - # we have to wrap it - fn = wrap_errors(fn) - result = threadpool._spawn(fn, *args, **kwargs) - result._get = result.get - result.get = types.MethodType(get_with_exception, result, type(result)) - return result +class ThreadPool(gevent.threadpool.ThreadPool): + + def spawn(self, fn, *args, **kwargs): + fn = wrap_error(fn) + fn_result = super(ThreadPool, self).spawn(fn, *args, **kwargs) + return gevent.spawn(unwrap_error, fn_result) # Gevent task and event loop class GeventTask: - def __init__(self, event, func, *args, **kwargs): - self.event = event + def __init__(self, func, *args, **kwargs): self.func = func self.args = args self.kwargs = kwargs self.value = None self.exception = None + self.event = ThreadSafeEvent() def run(self): try: @@ -130,22 +120,13 @@ class GeventTask: class GeventLoop: def __init__(self): - self.thread_id = get_ident() - self.tasks = gevent.queue.Queue() - self.loop = gevent.spawn(self.run) - - def run(self): - while True: - self.tasks.get().spawn() - - def is_gevent_thread(self): - return self.thread_id == get_ident() + self.hub = gevent.get_hub() def submit(self, func, *args, **kwargs): - event = gevent._threading.Event() - task = GeventTask(event, func, *args, **kwargs) - self.tasks.put_nowait(task) - self.tasks.hub.loop.async().send() + task = GeventTask(func, *args, **kwargs) + watcher = self.hub.loop.async() + watcher.start(task.spawn) + watcher.send() return task @@ -158,6 +139,7 @@ class GeventExecutor(AbstractExecutor): default_wait = True def __init__(self, loop=None, subexecutor=None): + super(GeventExecutor, self).__init__() if loop is None: loop = GeventLoop() if subexecutor is None: @@ -178,7 +160,7 @@ class GeventExecutor(AbstractExecutor): def execute(self, fn, *args, **kwargs): """Execute an operation and return the result.""" - if self.loop.is_gevent_thread(): + if self.in_executor_context(): return fn(*args, **kwargs) task = self.submit(fn, *args, **kwargs) return task.result() diff --git a/tango/green.py b/tango/green.py index 020c4d7..2bd4033 100644 --- a/tango/green.py +++ b/tango/green.py @@ -14,6 +14,12 @@ import os from functools import wraps +# Compatibility imports +try: + from threading import get_ident +except: + from threading import _get_ident as get_ident + # Tango imports from ._tango import GreenMode @@ -62,6 +68,12 @@ class AbstractExecutor(object): asynchronous = NotImplemented default_wait = NotImplemented + def __init__(self): + self.thread_id = get_ident() + + def in_executor_context(self): + return self.thread_id == get_ident() + def delegate(self, fn, *args, **kwargs): """Delegate an operation and return an accessor.""" if not self.asynchronous: @@ -89,10 +101,11 @@ class AbstractExecutor(object): def run(self, fn, args=(), kwargs={}, wait=None, timeout=None): if wait is None: wait = self.default_wait + # Wait and timeout are not supported in synchronous mode + if not self.asynchronous and (not wait or timeout): + raise ValueError('Not supported in synchronous mode') # Sychronous (no delegation) - if not self.asynchronous: - if not wait or timeout: - raise ValueError('Not supported in synchronous mode') + if not self.asynchronous or not self.in_executor_context(): return fn(*args, **kwargs) # Asynchronous delegation accessor = self.delegate(fn, *args, **kwargs) diff --git a/tango/release.py b/tango/release.py index 48cba49..83869be 100644 --- a/tango/release.py +++ b/tango/release.py @@ -44,8 +44,8 @@ class Release: - keywords: (seq<str>) list of keywords - license: (str) the license """ - name = 'PyTango' - version_info = (9, 2, 2) + name = 'pytango' + version_info = (9, 2, 3) version = '.'.join(map(str, version_info[:3])) release = ''.join(map(str, version_info[3:])) separator = '.' if 'dev' in release or 'post' in release else '' diff --git a/tango/tango_object.py b/tango/tango_object.py index 825b1f4..7cac855 100644 --- a/tango/tango_object.py +++ b/tango/tango_object.py @@ -13,7 +13,7 @@ import os import sys -import types +import six import logging import weakref import inspect @@ -155,7 +155,7 @@ def create_tango_class(server, obj, tango_class_name=None, member_filter=None): if doc is None: doc = "" cmd.__doc__ = doc - cmd = types.MethodType(cmd, None, DeviceDispatcher) + cmd = six.create_unbound_method(cmd, DeviceDispatcher) setattr(DeviceDispatcher, name, cmd) DeviceDispatcherClass.cmd_list[name] = \ [[in_type, doc], [out_type, ""]] |