|
From: <ni...@us...> - 2010-11-25 12:25:40
|
Revision: 163
http://openautomation.svn.sourceforge.net/openautomation/?rev=163&view=rev
Author: nilss1
Date: 2010-11-25 12:25:33 +0000 (Thu, 25 Nov 2010)
Log Message:
-----------
scan group addresses load physical addresses on startup for healthcheck * some fixes
Modified Paths:
--------------
PyWireGate/trunk/WireGate.py
PyWireGate/trunk/connector.py
PyWireGate/trunk/daemon.py
PyWireGate/trunk/datastore.py
PyWireGate/trunk/knx_connector/BusMonitor.py
PyWireGate/trunk/knx_connector/KNX_Connector.py
Modified: PyWireGate/trunk/WireGate.py
===================================================================
--- PyWireGate/trunk/WireGate.py 2010-11-24 11:27:22 UTC (rev 162)
+++ PyWireGate/trunk/WireGate.py 2010-11-25 12:25:33 UTC (rev 163)
@@ -183,6 +183,10 @@
for obj in self.watchdoglist.keys():
if time.time() > self.watchdoglist[obj]:
self.log("\n\nInstanz %s reagiert nicht\n\n" % obj,'error')
+ try:
+ self.connectors[obj].shutdown()
+ except:
+ pass
del self.watchdoglist[obj]
## set Watchdog
Modified: PyWireGate/trunk/connector.py
===================================================================
--- PyWireGate/trunk/connector.py 2010-11-24 11:27:22 UTC (rev 162)
+++ PyWireGate/trunk/connector.py 2010-11-25 12:25:33 UTC (rev 163)
@@ -15,7 +15,7 @@
def start(self):
self.log("%s (%s) starting up" % (self.CONNECTOR_NAME, self.instanceName) ,'info','WireGate')
self.isrunning=True
- self._thread = threading.Thread(target=self.run,name=__name__)
+ self._thread = threading.Thread(target=self.run,name="%s_main" % self.instanceName)
self._thread.setDaemon(1)
self._thread.start()
@@ -32,7 +32,8 @@
self.isrunning=False
if hasattr(self,'_shutdown'):
self._shutdown()
- self._thread.join(2)
+ if self._thread.isALive():
+ self._thread.join(2)
if self._thread.isAlive():
self.log("Shutdown Failed",'critical')
Modified: PyWireGate/trunk/daemon.py
===================================================================
--- PyWireGate/trunk/daemon.py 2010-11-24 11:27:22 UTC (rev 162)
+++ PyWireGate/trunk/daemon.py 2010-11-25 12:25:33 UTC (rev 163)
@@ -13,7 +13,7 @@
print e
sys.exit()
-class Daemon:
+class Daemon(object):
"""
A generic daemon class.
Modified: PyWireGate/trunk/datastore.py
===================================================================
--- PyWireGate/trunk/datastore.py 2010-11-24 11:27:22 UTC (rev 162)
+++ PyWireGate/trunk/datastore.py 2010-11-25 12:25:33 UTC (rev 163)
@@ -100,6 +100,8 @@
self.locked.release()
return self.dataobjects[id]
+ def namespaceRead(self,namespace):
+ return filter(lambda x,y=namespace: x.namespace == y,self.dataobjects.values())
def load(self):
self.debug("load DATASTORE")
@@ -200,7 +202,7 @@
namespace = namespace[0]
else:
## Fixme: maybe default Namespace
- namespace = ""
+ namespace = "UNKNOWN"
self.namespace = namespace
if not name:
@@ -249,7 +251,7 @@
if self.namespace:
try:
self.write_mutex.acquire()
- self._setValue = self.WG.connectors[self.namespace].setValue
+ #self._setValue = self.WG.connectors[self.namespace].setValue
self.WG.connectors[self.namespace].setValue(refered_self)
finally:
self.write_mutex.release()
Modified: PyWireGate/trunk/knx_connector/BusMonitor.py
===================================================================
--- PyWireGate/trunk/knx_connector/BusMonitor.py 2010-11-24 11:27:22 UTC (rev 162)
+++ PyWireGate/trunk/knx_connector/BusMonitor.py 2010-11-25 12:25:33 UTC (rev 163)
@@ -271,7 +271,7 @@
def log(self,msg,severity='info',instance=False):
if not instance:
- instance = self.instanceName
+ instance = 'KNX'
self._parent.log(msg,severity,instance)
Modified: PyWireGate/trunk/knx_connector/KNX_Connector.py
===================================================================
--- PyWireGate/trunk/knx_connector/KNX_Connector.py 2010-11-24 11:27:22 UTC (rev 162)
+++ PyWireGate/trunk/knx_connector/KNX_Connector.py 2010-11-25 12:25:33 UTC (rev 163)
@@ -55,6 +55,7 @@
self.eibmutex = threading.RLock()
self.DeviceList = {}
+ self.DeviceListLock = threading.RLock()
self.sendQueue = KNXSendQueue(maxsize=5000)
self.QueueWaitTime = 0.0
@@ -64,7 +65,7 @@
self.dpt = DPT_Types.dpt_type(self)
- self.GrpAddrRegex = re.compile(r"(?:|(\d+)\x2F)(\d+)\x2F(\d+)$",re.MULTILINE)
+ self.AddrRegex = re.compile(r"(?:|(\d+)[\x2F|\x2E])(\d+)[\x2F|\x2E](\d+)$",re.MULTILINE)
## Deafaultconfig
defaultconfig = {
@@ -83,6 +84,7 @@
self._sendThread = threading.Thread()
self._checkThread = threading.Thread()
+
## Start the Thread
self.start()
@@ -94,17 +96,18 @@
self.statistics()
try:
## wait a second for the Busmon to activate
+ if not self._readThread.isAlive():
+ self._readThread = threading.Thread(target=self._sendloop,name="%s_read" % self.instanceName)
+ self._readThread.setDaemon(0)
+ self._readThread.start()
if not self._sendThread.isAlive():
- self._sendThread = threading.Thread(target=self._run)
- self._sendThread.setDaemon(True)
+ self._sendThread = threading.Thread(target=self._run,name="%s_send" % self.instanceName)
+ self._sendThread.setDaemon(0)
self._sendThread.start()
- if not self._readThread.isAlive():
- self._readThread = threading.Thread(target=self._sendloop)
- self._readThread.setDaemon(True)
- self._readThread.start()
+
if not self._checkThread.isAlive():
- self._checkThread = threading.Thread(target=self._healthCheck)
- self._checkThread.setDaemon(True)
+ self._checkThread = threading.Thread(target=self._healthCheck,name="%s_check" % self.instanceName)
+ self._checkThread.setDaemon(0)
self._checkThread.start()
@@ -120,12 +123,13 @@
self.idle(5)
def _shutdown(self):
+ ##self.statistics()
for rthread in [self._checkThread,self._sendThread,self._readThread]:
try:
- rthread.join()
+ if rthread.isAlive():
+ rthread.join(2)
except:
pass
- self.statistics()
def _run(self):
try:
@@ -133,32 +137,39 @@
self.KNX.EIB_Cache_Enable()
if self.config['parser'] == "groupsocket":
self.debug("Using Groupsocket parser")
- self.KNX.EIBOpen_GroupSocket_async(0)
+ #self.KNX.EIBOpen_GroupSocket_async(0)
+ self.KNX.EIBOpen_GroupSocket(0)
else:
self.debug("Using Busmonitor Parser")
+ #self.KNX.EIBOpenVBusmonitor()
self.KNX.EIBOpenVBusmonitor()
while self.isrunning:
## Check if we are alive and responde until 10 secs
self.WG.watchdog(self.instanceName,10)
- if self.config['parser'] == "busmonior":
+ if self.config['parser'] == "groupsocket":
+ self.KNX.EIBGetGroup_Src(self.KNXBuffer,self.KNXSrc,self.KNXDst)
+ ## Only decode packets larger than 1 octet
+ try:
+ self.DeviceListLock.acquire()
+ if self.KNXSrc.data not in self.DeviceList:
+ self.DeviceList[self.KNXSrc.data] = self.groupsocket._decodePhysicalAddr(self.KNXSrc.data)
+ finally:
+ self.DeviceListLock.release()
+ if len(self.KNXBuffer.buffer) > 1 :
+ self.groupsocket.decode(self.KNXBuffer.buffer,self.KNXSrc.data,self.KNXDst.data)
+ else:
self.KNX.EIBGetBusmonitorPacket(self.KNXBuffer)
## Only decode packets larger than 7 octets
if len(self.KNXBuffer.buffer) > 7 :
self.busmon.decode(self.KNXBuffer.buffer)
- else:
- self.KNX.EIBGetGroup_Src(self.KNXBuffer,self.KNXSrc,self.KNXDst)
- ## Only decode packets larger than 1 octet
- if self.KNXSrc.data not in self.DeviceList:
- self.DeviceList[self.KNXSrc.data] = self.groupsocket._decodePhysicalAddr(self.KNXSrc.data)
- if len(self.KNXBuffer.buffer) > 1 :
- self.groupsocket.decode(self.KNXBuffer.buffer,self.KNXSrc.data,self.KNXDst.data)
+
finally:
self.KNX.EIBClose()
- def str2grpaddr(self,addrstr):
- grpaddr = self.GrpAddrRegex.findall(addrstr)
+ def str2addr(self,addrstr):
+ grpaddr = self.AddrRegex.findall(addrstr)
if not grpaddr:
return False
## regex result 1
@@ -166,15 +177,20 @@
addr = 0
## if GROUP3 Addr
if grpaddr[0]:
- addr = int(grpaddr[0]) << 11
+ if addrstr.find(".") < 1:
+ addr = int(grpaddr[0]) << 11
+ else:
+ addr = int(grpaddr[0]) << 12
addr = addr | (int(grpaddr[1]) << 8)
addr = addr | int(grpaddr[2])
return addr
+
def _sendloop(self):
addr = 0
msg = []
+ self.readconfig()
try:
while self.isrunning:
try:
@@ -183,6 +199,7 @@
try:
self.eibmutex.acquire()
self.QueueWaitTime += wtime
+ print "ADDR %r -- MSG %r" % (addr,msg)
self.KNX.EIBSendGroup(addr,msg)
finally:
self.eibmutex.release()
@@ -197,7 +214,7 @@
def send(self,msg,dstaddr,flag=KNXWRITEFLAG):
try:
- addr = self.str2grpaddr(dstaddr)
+ addr = self.str2addr(dstaddr)
if addr:
apdu = [0]
if type(msg) == int:
@@ -229,7 +246,14 @@
self.idle(10)
while self.isrunning:
if self.config['checktime'] > 0:
- for physaddr, device in self.DeviceList.items():
+ ## wait 5 Minutes
+ self.idle(self.config['checktime'])
+ try:
+ self.DeviceListLock.acquire()
+ devices = self.DeviceList.items()
+ finally:
+ self.DeviceListLock.release()
+ for physaddr, device in devices:
while self.sendQueue.qsize() > 5 and self.isrunning:
## no voltage check on higher busload
self.debug("SendQueue to busy wait check Voltage")
@@ -244,7 +268,10 @@
if 'ignorecheck' in obj.config:
continue
try:
- self.cKNX.EIB_MC_Connect(physaddr)
+ ret = self.cKNX.EIB_MC_Connect(physaddr)
+ if ret == -1:
+ self.cKNX.EIBReset()
+ continue
## read voltage
ret = self.cKNX.EIB_MC_ReadADC(1,1,ebuf)
try:
@@ -261,8 +288,6 @@
pass
## wait 1000ms between checks
self.idle(1)
- ## wait 5 Minutes
- self.idle(self.config['checktime'])
else:
self.idle(60)
finally:
@@ -301,13 +326,32 @@
try:
self.eibmutex.acquire()
+ waittime = self.QueueWaitTime / stime
self.debug("QueueWait: %r in %r sec" % (self.QueueWaitTime , stime))
- waittime = self.QueueWaitTime / stime
self.QueueWaitTime = 0.0
finally:
self.eibmutex.release()
+
+ #print "THREADS %r" % threading.enumerate()
id = "%s:STATS-QueueWaitTime" % (self.instanceName)
self.WG.DATASTORE.update(id,waittime)
+
+
+ def readconfig(self):
+ getPhysical = re.compile(":PHY_((?:1[0-5]|[0-9])\x2E(?:1[0-5]|[0-9])\x2E(?:[0-9]{1,3}))")
+ instLength = len(self.instanceName)
+ for obj in self.WG.DATASTORE.namespaceRead(self.instanceName):
+ physaddr = getPhysical.findall(obj.id.encode('iso8859-15'))
+ if physaddr:
+ try:
+ self.DeviceListLock.acquire()
+ self.DeviceList[self.str2addr(physaddr[0])] = physaddr[0]
+ finally:
+ self.DeviceListLock.release()
+ elif 'scan' in obj.config:
+ ## read all objects marked for scan
+ self.send(0,obj.id.encode('iso8859-15'),flag=KNXREADFLAG)
+
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|