|
From: <ni...@us...> - 2010-11-15 19:58:52
|
Revision: 130
http://openautomation.svn.sourceforge.net/openautomation/?rev=130&view=rev
Author: nilss1
Date: 2010-11-15 19:58:41 +0000 (Mon, 15 Nov 2010)
Log Message:
-----------
Use Queue based readThreads and support subtypes (DS2438TH/DS2438THS)
Modified Paths:
--------------
PyWireGate/trunk/owfs_connector/OWFS_Connector.py
PyWireGate/trunk/owfs_connector/sensors.ini
Modified: PyWireGate/trunk/owfs_connector/OWFS_Connector.py
===================================================================
--- PyWireGate/trunk/owfs_connector/OWFS_Connector.py 2010-11-15 19:56:47 UTC (rev 129)
+++ PyWireGate/trunk/owfs_connector/OWFS_Connector.py 2010-11-15 19:58:41 UTC (rev 130)
@@ -22,22 +22,25 @@
import threading
import time
+from Queue import Empty,Full,Queue
+import heapq
+
+
class owfs_connector(Connector):
+
+
CONNECTOR_NAME = 'OWFS Connector'
CONNECTOR_VERSION = 0.2
CONNECTOR_LOGNAME = 'owfs_connector'
def __init__(self,parent, instanceName):
self._parent = parent
- if parent:
- self.WG = parent.WG
- else:
- self.WG = False
+ self.WG = parent.WG
self.instanceName = instanceName
self.mutex = threading.RLock()
defaultconfig = {
- 'cycletime' : 15,
+ 'cycletime' : 600,
'server' : '127.0.0.1',
'port' : 4304
}
@@ -61,7 +64,7 @@
self.supportedsensors[sensor] = {}
cycledefault = defaultconfig['cycletime']
if 'cycle' in sensorconfig[sensor]:
- cycledefault = sensorconfig[sensor]['cycle']
+ self.supportedsensors[sensor]['cycle'] = sensorconfig[sensor]['cycle']
del sensorconfig[sensor]['cycle']
if 'interfaces' in sensorconfig[sensor]:
self.supportedsensors[sensor]['interfaces'] = {}
@@ -70,6 +73,17 @@
## remove Interface key from dict
del sensorconfig[sensor]['interfaces']
+ if 'subtypes' in sensorconfig[sensor]:
+ subtypes = sensorconfig[sensor]['subtypes'].split(",")
+ self.supportedsensors[sensor]['subtypes'] = {}
+ for stype in subtypes:
+ id,subname = stype.split(":",1)
+ try:
+ self.supportedsensors[sensor]['subtypes'][int(id,16)] = subname
+ except:
+ pass
+ del sensorconfig[sensor]['subtypes']
+
for key in sensorconfig[sensor].keys():
if key.startswith("config_"):
try:
@@ -102,11 +116,11 @@
return config
def run(self):
- cnt = 10
+ busscantime = 0
while self.isrunning:
## every 10 runs search new sensors
- if cnt == 10:
- cnt = 0
+ if busscantime < time.time():
+ busscantime = time.time() + self.config['cycletime']
## find new sensors
#self.findsensors()
try:
@@ -122,27 +136,27 @@
## IDLE for cycletime seconds (default 15sec)
## Fixme: maybe optimize cycletime dynamic based on busload
- self.idle(self.config['cycletime'])
+ self.idle(.1)
+ for busname in self.busmaster.keys():
+ if self.busmaster[busname]['readthread'] <> None:
+ self.busmaster[busname]['readthread'].join()
- ## counter increment for sensorsearch
- cnt += 1
-
def findbusmaster(self,path=""):
## search for active busses
- nochilds = True
+ childs = False
uncachedpath = "/uncached%s" % path
for bus in self.owfs.dir(uncachedpath):
bus = bus[9:]
if self.isbus.search(bus):
- nochilds = False
+ childs = True
## get the bus ID
busname = self.isbus.search(bus)
if busname:
try:
- if self.findbusmaster(bus):
+ if not self.findbusmaster(bus):
## if this has no subbuses add it to the list
try:
self.mutex.acquire()
@@ -152,9 +166,11 @@
except KeyError:
## add to list
self.busmaster[bus] = {
- 'sensors' : {},
+ 'buscycle' : 600,
+ 'nextrun' : 0,
'lastseen' : time.time(),
- 'readthread' : None
+ 'readthread' : None,
+ 'readQueue' : ReadQueue(200)
}
finally:
self.mutex.release()
@@ -163,14 +179,12 @@
except:
## ignore all OWFS Errors
pass
- return nochilds
+ return childs
- def checkBusCycleTime(self,bus):
- pass
-
def findsensors(self,path=""):
uncachedpath = "/uncached%s" % path
+ mincycle = 600
for sensor in self.owfs.dir(uncachedpath):
## remove /uncached/bus.x from sensorname
sensor = sensor.split("/")[-1]
@@ -184,82 +198,142 @@
except:
## ignore all OWFS Errors
continue
+ if 'subtypes' in self.supportedsensors[sensortype]:
+ ##check subtypes
+ try:
+ info = ord(self.owfs.read("%s/pages/page.3" % sensor)[0])
+ sensortype += self.supportedsensors[sensortype]['subtypes'][info]
+ self.debug("Sensor Subtype %s is used" % sensortype)
+ except:
+ ## nothing found
+ pass
+
if sensortype not in self.supportedsensors:
self.debug("unsupported Type: %r" % sensortype)
continue
+
if 'interfaces' not in self.supportedsensors[sensortype]:
self.debug("Sensor Type: %r has no supported Interfaces" % sensortype)
continue
- ### add it to the list of active sensors
- ## FIXME: check for old sensor no longer active and remove
+ sensorlist = "sensors"
+ cycle = 600
+ if 'cycle' in self.supportedsensors[sensortype]:
+ cycle = self.supportedsensors[sensortype]['cycle']
+
try:
self.mutex.acquire()
- self.busmaster[path]['sensors'][sensor] = {
+ print "Sensortype: %s config: %r" % (sensortype,self.supportedsensors[sensortype]['interfaces'])
+
+ self.sensors[sensor] = {
'type':sensortype,
+ 'cycle':cycle,
+ 'nextrun':0,
'interfaces': self.supportedsensors[sensortype]['interfaces']
}
finally:
self.mutex.release()
+
+ self._addQueue(path,sensor)
+
+ def read(self):
+ for busname in self.busmaster.keys():
+ if not self.busmaster[busname]['readQueue'].empty():
+ if not self.busmaster[busname]['readthread']:
+ self.debug("Start read Thread for %s" % busname)
+ threadname = "OWFS-Reader_%s" % busname
+ try:
+ self.mutex.acquire()
+ self.busmaster[busname]['readthread'] = threading.Thread(target=self._readThread,args=[busname],name=threadname)
+ self.busmaster[busname]['readthread'].start()
+ finally:
+ self.mutex.release()
-
- def _read(self,busname):
- ## loop through all sensors in lokal busmaster list
- self.debug("Thread running for %s" % busname)
- readtime = time.time()
- for sensor in self.busmaster[busname]['sensors'].keys():
- #self.sensors[sensor]['power'] = self.owfs.read("/"+sensor+"/power")
+ def _read(self,busname,sensor):
+ for get in self.sensors[sensor]['interfaces'].keys():
+ ## make an id for the sensor (OW:28.043242a32_temperature
+ id = "%s:%s_%s" % (self.instanceName,sensor,get)
+ ## get the Datastore Object and look for config
+ obj = self.WG.DATASTORE.get(id)
- ## loop through their interfaces
- for get in self.busmaster[busname]['sensors'][sensor]['interfaces'].keys():
- resolution = ""
- id = "%s:%s_%s" % (self.instanceName,sensor,get)
+ sensortype = self.sensors[sensor]['type']
- ## get the Datastore Object and look for config
- obj = self.WG.DATASTORE.get(id)
- sensortype = self.busmaster[busname]['sensors'][sensor]['type']
- self.checkConfigDefaults(obj,self.supportedsensors[sensortype]['interfaces'][get])
- if "resolution" in obj.config:
- resolution = str(obj.config['resolution'])
-
- owfspath = "/uncached/%s/%s%s" % (sensor,get,resolution)
- self.debug("Reading from path %s" % owfspath)
- data = False
- try:
- ## read uncached and put into local-list
- data = self.owfs.read(owfspath)
- except:
- ## ignore all OWFS Errors
- #self.WG.errorlog("Reading from path %s failed" % owfspath)
- self.log("Reading from path %s failed" % owfspath)
+ ## recheck config
+ self.checkConfigDefaults(obj,self.supportedsensors[sensortype]['interfaces'][get])
+
+ owfspath = "/uncached/%s/%s%s" % (sensor,get,obj.config.get('resolution',''))
+ self.debug("Reading from path %s" % owfspath)
- try:
- self.mutex.acquire()
- self.busmaster[busname]['sensors'][sensor][get] = data
- finally:
- self.mutex.release()
- ## make an id for the sensor (OW:28.043242a32_temperature
- try:
- ## only if there is any Data update it in the DATASTORE
- if self.busmaster[busname]['sensors'][sensor][get]:
- self.WG.DATASTORE.update(id,self.busmaster[busname]['sensors'][sensor][get])
- except:
- self.WG.errorlog()
- self.busmaster[busname]['readthread'] = None
- self.debug("Thread for %s finshed reading %d sensors in %f secs " % (busname,len(self.busmaster[busname]['sensors']), time.time() - readtime))
-
- def read(self):
- for busname in self.busmaster.keys():
- if len(self.busmaster[busname]['sensors'])>0:
- if not self.busmaster[busname]['readthread']:
- self.debug("Start read Thread for %s" % busname)
- threadname = "OWFS-Reader_%s" % busname
- try:
- self.mutex.acquire()
- self.busmaster[busname]['readthread'] = threading.Thread(target=self._read,args=[busname],name=threadname)
- self.busmaster[busname]['readthread'].start()
- finally:
- self.mutex.release()
+ data = None
+ try:
+ ## read uncached and put into local-list
+ data = self.owfs.read(owfspath)
+ except:
+ ## ignore all OWFS Errors
+ #self.WG.errorlog("Reading from path %s failed" % owfspath)
+ self.log("Reading from path %s failed" % owfspath)
+ if data:
+ self.debug("%s: %r" % (id,data))
+ self.WG.DATASTORE.update(id,data)
+ self._addQueue(busname,sensor)
+ def _addQueue(self,busname,sensor):
+ cycletime = time.time() +self.sensors[sensor]['cycle']
+ self.debug("ADDED %s on %s with %s (%d)s" % (sensor,busname, time.asctime(time.localtime(cycletime)),self.sensors[sensor]['cycle']))
+ ## FIXME: not present iButtons should be added to all Busmaster Queues
+ #if self.sensors[sensor]['']
+ self.busmaster[busname]['readQueue'].put((cycletime,sensor))
+
+
+ def _readThread(self,busname):
+ try:
+ while self.isrunning:
+ while not self.busmaster[busname]['readQueue'].check():
+ if not self.isrunning:
+ break
+ time.sleep(.1)
+ rtime, sensor = self.busmaster[busname]['readQueue'].get()
+ self._read(busname,sensor)
+ finally:
+ if self.isrunning:
+ self.busmaster[busname]['readthread'] = None
+
+
+
+class ReadQueue(Queue):
+ def _init(self, maxsize):
+ self.maxsize = maxsize
+ self.queue = []
+
+ def _qsize(self):
+ return len(self.queue)
+
+ # Check whether the queue is empty
+ def _empty(self):
+ return not self.queue
+
+ # Check whether the queue is full
+ def _full(self):
+ return self.maxsize > 0 and len(self.queue) == self.maxsize
+
+ # Put a new item in the queue
+ def _put(self, item):
+ ## only new Sensors
+ if not filter(lambda x: x[1]==item[1],self.queue):
+ heapq.heappush(self.queue,item)
+
+ # Get an item from the queue
+ def _get(self):
+ return heapq.heappop(self.queue)
+
+ def check(self):
+ if len(self.queue) == 0:
+ return False
+ next = min(self.queue)
+ if len(next)==2:
+ next = next[0]
+ else:
+ next = 0
+ return (next <= time.time())
\ No newline at end of file
Modified: PyWireGate/trunk/owfs_connector/sensors.ini
===================================================================
--- PyWireGate/trunk/owfs_connector/sensors.ini 2010-11-15 19:56:47 UTC (rev 129)
+++ PyWireGate/trunk/owfs_connector/sensors.ini 2010-11-15 19:58:41 UTC (rev 130)
@@ -28,9 +28,17 @@
# Battery-monitor with Vxx and current, family 26
# /pages/page.3 conatins type: 0x19 or 0xF1 = TH, 0xF2 = THS
# FIXME: several more (not widely used) to be added
+subtypes = 0x19:TH,0xf1:TH,0xF2:THS
cycle = 60
interfaces = temperature,HIH4000/humidity,vis,VAD,VDD
+[DS2438TH]
+cycle = 60
+interfaces = temperature,HIH4000/humidity
+
+[DS2438THS]
+cycle = 60
+
[DS2406]
# dual switch / IO, family 12
# PIO.[A|B] if used as output
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|