|
From: <ni...@us...> - 2010-11-10 09:40:40
|
Revision: 96
http://openautomation.svn.sourceforge.net/openautomation/?rev=96&view=rev
Author: nilss1
Date: 2010-11-10 09:40:34 +0000 (Wed, 10 Nov 2010)
Log Message:
-----------
use priority based queue for knx messages, add an optional per connector method _shutdown to join local threads on exit
Modified Paths:
--------------
PyWireGate/trunk/connector.py
PyWireGate/trunk/knx_connector/KNX_Connector.py
Modified: PyWireGate/trunk/connector.py
===================================================================
--- PyWireGate/trunk/connector.py 2010-11-09 13:42:57 UTC (rev 95)
+++ PyWireGate/trunk/connector.py 2010-11-10 09:40:34 UTC (rev 96)
@@ -30,6 +30,8 @@
def shutdown(self):
self.log("%s (%s) shutting down" % (self.CONNECTOR_NAME, self.instanceName) ,'info','WireGate')
self.isrunning=False
+ if hasattr(self,'_shutdown'):
+ self._shutdown()
self._thread.join(2)
if self._thread.isAlive():
self.log("Shutdown Failed",'critical')
Modified: PyWireGate/trunk/knx_connector/KNX_Connector.py
===================================================================
--- PyWireGate/trunk/knx_connector/KNX_Connector.py 2010-11-09 13:42:57 UTC (rev 95)
+++ PyWireGate/trunk/knx_connector/KNX_Connector.py 2010-11-10 09:40:34 UTC (rev 96)
@@ -24,6 +24,9 @@
import BusMonitor
import GroupSocket
import DPT_Types
+from Queue import Empty,Full,Queue
+import heapq
+import threading
KNXREADFLAG = 0x00
KNXRESPONSEFLAG = 0x40
@@ -42,10 +45,16 @@
self.KNXBuffer = EIBConnection.EIBBuffer()
self.KNXSrc = EIBConnection.EIBAddr()
self.KNXDst = EIBConnection.EIBAddr()
+
+ #self.sendQueue = Queue.PriorityQueue(maxsize=5000)
+ self.sendQueue = KNXSendQueue(maxsize=5000)
+
+
self.busmon = BusMonitor.busmonitor(self)
self.groupsocket = GroupSocket.groupsocket(self)
self.dpt = DPT_Types.dpt_type(self)
+
self.GrpAddrRegex = re.compile(r"(?:|(\d+)\x2F)(\d+)\x2F(\d+)$",re.MULTILINE)
## Deafaultconfig
@@ -64,6 +73,9 @@
self.start()
def run(self):
+ self._sendThread = threading.Thread(target=self._sendloop)
+ self._sendThread.setDaemon(True)
+ self._sendThread.start()
while self.isrunning:
## Create Socket
try:
@@ -89,6 +101,12 @@
self.debug("Socket %r Closed waiting 5 sec" % self.config['url'])
self.idle(5)
+ def _shutdown(self):
+ try:
+ self._sendThread.join()
+ except:
+ pass
+
def _run(self):
while self.isrunning:
## Check if we are alive and responde until 10 secs
@@ -140,14 +158,29 @@
return addr
+ def _sendloop(self):
+ addr = 0
+ msg = []
+ while self.isrunning:
+ try:
+ (addr,msg) = self.sendQueue.get(timeout=1)
+ self.KNX.EIBSendGroup(addr,msg)
+ except Empty:
+ pass
+ except:
+ self.WG.errorlog("Failed send %r %r" % (addr,msg))
+
+
+
def send(self,msg,dstaddr):
try:
addr = self.str2grpaddr(dstaddr)
if addr:
msg = [0,KNXWRITEFLAG] +msg
- self.KNX.EIBSendGroup(addr,msg)
+ self.sendQueue.put((addr,msg))
+ #self.KNX.EIBSendGroup(addr,msg)
except:
- self.errormsg("Failed send %r to %r" % (msg,dstaddr))
+ self.WG.errorlog("Failed send %r to %r" % (msg,dstaddr))
def setValue(self,dsobj,msg=False):
try:
@@ -156,4 +189,38 @@
self.debug("SEND %r to %s (%s)" % (msg,dsobj.name,dsobj.id))
self.send(self.dpt.encode(msg,dsobj=dsobj),dsobj.id)
except:
- print "----------- ERROR IN KNX_CONNECTOR.setValue ----------------"
\ No newline at end of file
+ print "----------- ERROR IN KNX_CONNECTOR.setValue ----------------"
+
+
+class KNXSendQueue(Queue):
+ def _init(self, maxsize):
+ self.maxsize = maxsize
+ self.queue = []
+ self.activeaddr = []
+
+ def _qsize(self):
+ return len(self.queue)
+
+ # Check whether the queue is empty
+ def _empty(self):
+ return not self.queue
+
+ # Check whether the queue is full
+ def _full(self):
+ return self.maxsize > 0 and len(self.queue) == self.maxsize
+
+ # Put a new item in the queue
+ def _put(self, item):
+ ## add addr to active addr
+ addr = item[0]
+ prio = int(self.activeaddr.count(addr) > 5)
+ self.activeaddr.append(addr)
+ heapq.heappush(self.queue,(prio,item))
+
+ # Get an item from the queue
+ def _get(self):
+ prio,item = heapq.heappop(self.queue)
+ addr = item[0]
+ self.activeaddr.remove(addr)
+ return item
+
\ No newline at end of file
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|