|
From: <ni...@us...> - 2010-11-12 10:33:40
|
Revision: 112
http://openautomation.svn.sourceforge.net/openautomation/?rev=112&view=rev
Author: nilss1
Date: 2010-11-12 10:33:27 +0000 (Fri, 12 Nov 2010)
Log Message:
-----------
changed datastore cycles to apscheduler
Modified Paths:
--------------
PyWireGate/trunk/WireGate.py
PyWireGate/trunk/datastore.py
Modified: PyWireGate/trunk/WireGate.py
===================================================================
--- PyWireGate/trunk/WireGate.py 2010-11-12 08:59:20 UTC (rev 111)
+++ PyWireGate/trunk/WireGate.py 2010-11-12 10:33:27 UTC (rev 112)
@@ -23,8 +23,8 @@
import traceback
import daemon
import log
+import re
-
import ConfigParser
import datastore
@@ -41,7 +41,7 @@
self.REDIRECTIO = REDIRECTIO
## Get the path of this script
- self.scriptpath = str(datastore).split( )[3][1:-15]
+ self.scriptpath = re.findall("from \x27(.*)\x2F",str(datastore))[0]
self.readWireGateConfig()
Modified: PyWireGate/trunk/datastore.py
===================================================================
--- PyWireGate/trunk/datastore.py 2010-11-12 08:59:20 UTC (rev 111)
+++ PyWireGate/trunk/datastore.py 2010-11-12 10:33:27 UTC (rev 112)
@@ -15,6 +15,14 @@
sys.exit(1)
+try:
+ from apscheduler.scheduler import Scheduler as apscheduler
+except ImportError:
+ print >> sys.stderr, "apt-get install python-apscheduler"
+ sys.exit(1)
+
+
+
class datastore:
"""
Datastore Instance
@@ -34,8 +42,8 @@
self.DBLOADED = False
self.dataobjects = {}
- self.cycleThreadLock = threading.RLock()
- self.cycleThreads = {}
+ self.CYCLER = apscheduler()
+ self.CYCLER.start()
self.locked = threading.RLock()
self.locked.acquire()
@@ -85,7 +93,7 @@
except KeyError:
## create a new one if it don't exist
self.dataobjects[id] = dataObject(self,id)
- if connector:
+ if hasattr(connector,'get_ds_defaults'):
self.dataobjects[id].config = connector.get_ds_defaults(id)
## return it
self.locked.release()
@@ -143,34 +151,8 @@
dbfile.close()
- def attachThread(self,obj,threadObj=False):
- try:
- self.cycleThreadLock.acquire()
- ## check only
- if not threadObj:
- return obj in self.cycleThreads
- self.cycleThreads[obj] = threadObj
- finally:
- self.cycleThreadLock.release()
-
- return self.cycleThreads[obj]
-
-
- def removeThread(self,obj):
- self.cycleThreadLock.acquire()
- del self.cycleThreads[obj]
- self.cycleThreadLock.release()
-
-
def shutdown(self):
- self.cycleThreadLock.acquire()
- for obj in self.cycleThreads.keys():
- try:
- self.cycleThreads[obj].cancel()
- self.cycleThreads[obj].join()
- except:
- pass
- self.cycleThreadLock.release()
+ self.CYCLER.shutdown()
self.save()
@@ -193,9 +175,7 @@
class dataObject:
def __init__(self,parent,id,name=False):
self._parent = parent
- if parent:
- self.WG = parent.WG
-
+ self.WG = parent.WG
## Threadlocking
self.write_mutex = threading.RLock()
self.read_mutex = threading.RLock()
@@ -220,7 +200,7 @@
## guess that non unicode is iso8859
self.name = name.decode("iso-8859-15")
## some defaults
- self.value = u""
+ self.value = None
self.lastupdate = 0
self.id = id
@@ -231,6 +211,7 @@
self.config = {}
self.cyclestore = []
+ self._cyclejob = False
## connected Logics, communication objects ... goes here
self.connected = []
@@ -249,17 +230,23 @@
self.write_mutex.release()
def setValue(self,val,send=False):
- if 'sendcycle' in self.config:
- if not self.WG.DATASTORE.attachThread(self):
+ if 'sendcycle' in self.config and self.value <> None:
+ if not self._cyclejob:
self._parent.debug("start Cycle ID: %s" % self.id)
cycletime = float(self.config['sendcycle']) + self.lastupdate - time.time()
if cycletime < 0.0:
cycletime = 0
+
+ ## add value to List
self.cyclestore.append(val)
- ## FIXME: create global cycling Thread like in cycle.py
- _cyclethread = self.WG.DATASTORE.attachThread(self,threading.Timer(cycletime,self._cycle))
- #_cyclethread.setDaemon(1)
- _cyclethread.start()
+
+ ##
+ repeat = 1
+ if 'always' in self.config['sendcycleoption'].split(","):
+ repeat = 0
+ self.write_mutex.acquire()
+ self._cyclejob = self.WG.DATASTORE.CYCLER.add_interval_job(self._cycle,seconds=cycletime,repeat=repeat)
+ self.write_mutex.release()
else:
self._parent.debug("ignore Cycle ID: %s" % self.id)
self.cyclestore.append(val)
@@ -293,9 +280,9 @@
try:
self.WG.DATASTORE.dataobjects[attached].setValue(val,True)
except:
- print "FAILED %s" % attached
- __import__('traceback').print_exc(file=__import__('sys').stdout)
- pass
+ self.WG.log("sendconnected failed for %s" % attached,'error')
+ #__import__('traceback').print_exc(file=__import__('sys').stdout)
+
def getValue(self):
@@ -308,17 +295,36 @@
self.read_mutex.release()
def _cycle(self):
- self._parent.debug("execute Cycle ID: %s" % self.id)
- self.WG.DATASTORE.removeThread(self)
- val = self.getValue()
+ self._parent.debug("----------------------execute Cycle ID: %s" % self.id)
+ val = None
+ cycleopts = []
if 'sendcycleoption' in self.config:
- if self.config['sendcycleoption'] == 'average' and type(self.cyclestore[0]) in (int,float):
- val = type(self.cyclestore[0])(0)
- for i in self.cyclestore:
- val += i
- val = val / len(self.cyclestore)
- self._parent.debug("Cycle ID: %s average: %f (%r)" % (self.id, val, self.cyclestore ))
- self.cyclestore = []
+ cycleopts = self.config['sendcycleoption'].split(",")
+ ## average only possible with data in cyclesotre else DivisionByZero
+ if len(self.cyclestore) > 0:
+ ## Average/Min/Max only possible with int and float
+ if type(self.value) in (int,float):
+ val = type(self.value)(0)
+ if 'average' in cycleopts:
+ for i in self.cyclestore:
+ val += i
+ val = val / len(self.cyclestore)
+ self._parent.debug("Cycle ID: %s average: %f (%r)" % (self.id, val, self.cyclestore ))
+ ## default use last
+ else:
+ val = self.cyclestore.pop()
+ else:
+ val = self.cyclestore.pop()
+
+ if 'always' in cycleopts:
+ if val == None:
+ val = self.getValue()
else:
- val = self.cyclestore.pop()
- self._real_setValue(val,False)
+ self.write_mutex.acquire()
+ self._cyclejob = False
+ self.write_mutex.release()
+
+ ## reset cyclestore
+ self.cyclestore = []
+ if val <> None:
+ self._real_setValue(val,False)
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|