|
From: <ni...@us...> - 2010-11-22 14:58:33
|
Revision: 150
http://openautomation.svn.sourceforge.net/openautomation/?rev=150&view=rev
Author: nilss1
Date: 2010-11-22 14:58:25 +0000 (Mon, 22 Nov 2010)
Log Message:
-----------
added statistics for Bytes and Telegrams send
Modified Paths:
--------------
PyWireGate/trunk/knx_connector/KNX_Connector.py
Modified: PyWireGate/trunk/knx_connector/KNX_Connector.py
===================================================================
--- PyWireGate/trunk/knx_connector/KNX_Connector.py 2010-11-22 14:57:33 UTC (rev 149)
+++ PyWireGate/trunk/knx_connector/KNX_Connector.py 2010-11-22 14:58:25 UTC (rev 150)
@@ -27,7 +27,9 @@
from Queue import Empty,Full,Queue
import heapq
import threading
+import time
+
KNXREADFLAG = 0x00
KNXRESPONSEFLAG = 0x40
KNXWRITEFLAG = 0x80
@@ -41,15 +43,21 @@
self.WG = parent.WG
self.instanceName = instanceName
- self.KNX = EIBConnection.EIBConnection()
+ #self.KNX = EIBConnection.EIBConnection()
+ self.KNX = countEIBConnection()
+
+ self.cKNX = countEIBConnection()
+
self.KNXBuffer = EIBConnection.EIBBuffer()
self.KNXSrc = EIBConnection.EIBAddr()
self.KNXDst = EIBConnection.EIBAddr()
+ self.eibmutex = threading.RLock()
+
self.DeviceList = {}
self.sendQueue = KNXSendQueue(maxsize=5000)
-
+ self.QueueWaitTime = 0.0
self.busmon = BusMonitor.busmonitor(self)
self.groupsocket = GroupSocket.groupsocket(self)
@@ -71,27 +79,30 @@
## set local config
self.config = self.WG.config[self.instanceName]
- self._readThread = None
- self._sendThread = None
- self._checkThread = None
+ self._readThread = threading.Thread()
+ self._sendThread = threading.Thread()
+ self._checkThread = threading.Thread()
## Start the Thread
self.start()
def run(self):
+ cnt = 0
while self.isrunning:
## Create Socket
+ if cnt == 60 and self.isrunning:
+ self.statistics()
try:
## wait a second for the Busmon to activate
- if self._sendThread == None:
+ if not self._sendThread.isAlive():
self._sendThread = threading.Thread(target=self._run)
self._sendThread.setDaemon(True)
self._sendThread.start()
- if self._readThread == None:
+ if not self._readThread.isAlive():
self._readThread = threading.Thread(target=self._sendloop)
self._readThread.setDaemon(True)
self._readThread.start()
- if self._checkThread == None:
+ if not self._checkThread.isAlive():
self._checkThread = threading.Thread(target=self._healthCheck)
self._checkThread.setDaemon(True)
self._checkThread.start()
@@ -105,6 +116,7 @@
except:
self.WG.errorlog()
if self.isrunning:
+ cnt +=1
self.idle(5)
def _shutdown(self):
@@ -113,6 +125,7 @@
rthread.join()
except:
pass
+ self.statistics()
def _run(self):
try:
@@ -141,7 +154,6 @@
if len(self.KNXBuffer.buffer) > 1 :
self.groupsocket.decode(self.KNXBuffer.buffer,self.KNXSrc.data,self.KNXDst.data)
finally:
- self._readThread = None
self.KNX.EIBClose()
@@ -166,14 +178,21 @@
try:
while self.isrunning:
try:
- (addr,msg) = self.sendQueue.get(timeout=1)
- self.KNX.EIBSendGroup(addr,msg)
+ (addr,msg,wtime) = self.sendQueue.get(timeout=1)
+ wtime = time.time() - wtime
+ try:
+ self.eibmutex.acquire()
+ self.QueueWaitTime += wtime
+ self.KNX.EIBSendGroup(addr,msg)
+ finally:
+ self.eibmutex.release()
except Empty:
pass
except:
self.WG.errorlog("Failed send %r %r" % (addr,msg))
finally:
- self._sendThread = None
+ #self._sendThread = None
+ pass
def send(self,msg,dstaddr,flag=KNXWRITEFLAG):
@@ -204,14 +223,17 @@
def _healthCheck(self):
ebuf = EIBConnection.EIBBuffer()
- KNX = EIBConnection.EIBConnection()
self.debug("Starting HealthCheck")
try:
- KNX.EIBSocketURL(self.config['url'])
- self.idle(30)
+ self.cKNX.EIBSocketURL(self.config['url'])
+ self.idle(10)
while self.isrunning:
- if self.config['checktime'] >0:
+ if self.config['checktime'] > 0:
for physaddr, device in self.DeviceList.items():
+ while self.sendQueue.qsize() > 5 and self.isrunning:
+ ## no voltage check on higher busload
+ self.debug("SendQueue to busy wait check Voltage")
+ self.idle(5)
if not self.isrunning:
break
if physaddr < 4352:
@@ -221,18 +243,22 @@
obj = self.WG.DATASTORE.get(id)
if 'ignorecheck' in obj.config:
continue
- KNX.EIB_MC_Connect(physaddr)
- ## read voltage
- ret = KNX.EIB_MC_ReadADC(1,1,ebuf)
try:
- if ret > -1:
- value = ebuf.data * .15
- else:
- value = -1
- self.WG.DATASTORE.update(id,value)
- except:
+ self.cKNX.EIB_MC_Connect(physaddr)
+ ## read voltage
+ ret = self.cKNX.EIB_MC_ReadADC(1,1,ebuf)
+ try:
+ if ret > -1:
+ value = ebuf.data * .15
+ else:
+ value = -1
+ self.WG.DATASTORE.update(id,value)
+ except:
+ pass
+ #self.KNX.EIBReset()
+ self.cKNX.EIBComplete()
+ finally:
pass
- KNX.EIBReset()
## wait 1000ms between checks
self.idle(1)
## wait 5 Minutes
@@ -240,10 +266,51 @@
else:
self.idle(60)
finally:
- self._checkThread = None
- KNX.EIBClose()
+ #self._checkThread = None
+ self.cKNX.EIBClose()
+ def statistics(self):
+ stats = self.KNX.getStatistic()
+ stime = time.time() - stats['time']
+ for s in ["","bytes"]:
+ id = "%s:STATS-Read%sPerSecond" % (self.instanceName,s)
+ readPerSec = float(stats['read%s' % s]) / stime
+ self.WG.DATASTORE.update(id,readPerSec)
+
+ id = "%s:STATS-Write%sPerSecond" % (self.instanceName,s)
+ writePerSec = float(stats['write%s' % s]) / stime
+ self.WG.DATASTORE.update(id,writePerSec)
+
+ stats = self.cKNX.getStatistic()
+ stime = time.time() - stats['time']
+
+ id = "%s:STATS-checkRead%sPerSecond" % (self.instanceName,s)
+ creadPerSec = float(stats['read%s' % s]) / stime
+ self.WG.DATASTORE.update(id,creadPerSec)
+
+ id = "%s:STATS-checkWrite%sPerSecond" % (self.instanceName,s)
+ cwritePerSec = float(stats['write%s' % s]) / stime
+ self.WG.DATASTORE.update(id,cwritePerSec)
+
+ id = "%s:STATS-totalRead%sPerSecond" % (self.instanceName,s)
+ self.WG.DATASTORE.update(id,readPerSec + creadPerSec)
+
+ id = "%s:STATS-totalWrite%sPerSecond" % (self.instanceName,s)
+ self.WG.DATASTORE.update(id,writePerSec + cwritePerSec)
+
+ try:
+ self.eibmutex.acquire()
+ self.debug("QueueWait: %r in %r sec" % (self.QueueWaitTime , stime))
+ waittime = self.QueueWaitTime / stime
+ self.QueueWaitTime = 0.0
+ finally:
+ self.eibmutex.release()
+ id = "%s:STATS-QueueWaitTime" % (self.instanceName)
+ self.WG.DATASTORE.update(id,waittime)
+
+
+
class KNXSendQueue(Queue):
def _init(self, maxsize):
self.maxsize = maxsize
@@ -265,6 +332,7 @@
def _put(self, item):
## add addr to active addr
addr = item[0]
+ item += time.time(),
prio = 0
if len(self.queue) > 10:
## if queue size is over 10 use priority
@@ -278,4 +346,40 @@
addr = item[0]
self.activeaddr.remove(addr)
return item
-
\ No newline at end of file
+
+
+class countEIBConnection(EIBConnection.EIBConnection):
+ def __init__(self):
+ self.statistic_mutex = threading.RLock()
+ self.statistics = {}
+ self.getStatistic()
+ EIBConnection.EIBConnection.__init__(self)
+
+ def getStatistic(self):
+ try:
+ self.statistic_mutex.acquire()
+ return self.statistics.copy()
+ finally:
+ self.statistics = {'read' : 0, 'readbytes' : 0, 'write' : 0, 'writebytes' : 0, 'time' : time.time() }
+ self.statistic_mutex.release()
+
+ def _EIBConnection__EIB_CheckRequest(self, block):
+ ret = EIBConnection.EIBConnection._EIBConnection__EIB_CheckRequest(self,block)
+ try:
+ self.statistic_mutex.acquire()
+ self.statistics['read'] += 1
+ self.statistics['readbytes'] += self.readlen
+ finally:
+ self.statistic_mutex.release()
+ return ret
+
+ def _EIBConnection__EIB_SendRequest(self, data):
+ ret = EIBConnection.EIBConnection._EIBConnection__EIB_SendRequest(self, data)
+ try:
+ self.statistic_mutex.acquire()
+ self.statistics['write'] += 1
+ self.statistics['writebytes'] += len([ (len(data)>>8)&0xff, (len(data))&0xff ] + data)
+ finally:
+ self.statistic_mutex.release()
+ return ret
+
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|