From: <ni...@us...> - 2010-11-06 23:29:10
|
Revision: 91 http://openautomation.svn.sourceforge.net/openautomation/?rev=91&view=rev Author: nilss1 Date: 2010-11-06 23:29:03 +0000 (Sat, 06 Nov 2010) Log Message: ----------- no longer get WireGateInstance as Argument to new Classes, instead user parent Class and make self.WG = parent.WG. This way each class can reach all parents using self._parent Modified Paths: -------------- PyWireGate/trunk/WireGate.py PyWireGate/trunk/comet_server/CometServer.py PyWireGate/trunk/connector.py PyWireGate/trunk/console_server/consoleserver.py PyWireGate/trunk/cycler.py PyWireGate/trunk/datastore.py PyWireGate/trunk/knx_connector/BusMonitor.py PyWireGate/trunk/knx_connector/DPT_Types.py PyWireGate/trunk/knx_connector/GroupSocket.py PyWireGate/trunk/knx_connector/KNX_Connector.py PyWireGate/trunk/owfs_connector/OWFS_Connector.py Modified: PyWireGate/trunk/WireGate.py =================================================================== --- PyWireGate/trunk/WireGate.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/WireGate.py 2010-11-06 23:29:03 UTC (rev 91) @@ -31,6 +31,7 @@ class WireGate(daemon.Daemon): def __init__(self,REDIRECTIO=False): + self._parent = self self.WG = self self.watchdoglist = {} self.connectors = {} Modified: PyWireGate/trunk/comet_server/CometServer.py =================================================================== --- PyWireGate/trunk/comet_server/CometServer.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/comet_server/CometServer.py 2010-11-06 23:29:03 UTC (rev 91) @@ -33,8 +33,12 @@ CONNECTOR_NAME = 'Comet Server' CONNECTOR_VERSION = 0.1 CONNECTOR_LOGNAME = 'comet_server' - def __init__(self,WireGateInstance, instanceName): - self.WG = WireGateInstance + def __init__(self,parent, instanceName): + self._parent = parent + if parent: + self.WG = parent.WG + else: + self.WG = False self.instanceName = instanceName defaultconfig = { 'port' : 4498 Modified: PyWireGate/trunk/connector.py =================================================================== --- PyWireGate/trunk/connector.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/connector.py 2010-11-06 23:29:03 UTC (rev 91) @@ -7,8 +7,9 @@ CONNECTOR_LOGNAME = __name__ isrunning=False - def __init__(self,WireGateInstance,instanceName): - self.WG = WireGateInstance + def __init__(self,parent,instanceName): + self._parent = parent + self.WG = parent.WG self.instanceName = instanceName """Overide""" def start(self): @@ -24,7 +25,7 @@ def log(self,msg,severity='info',instance=False): if not instance: instance = self.instanceName - self.WG.log(msg,severity,instance) + self._parent.log(msg,severity,instance) def shutdown(self): self.log("%s (%s) shutting down" % (self.CONNECTOR_NAME, self.instanceName) ,'info','WireGate') Modified: PyWireGate/trunk/console_server/consoleserver.py =================================================================== --- PyWireGate/trunk/console_server/consoleserver.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/console_server/consoleserver.py 2010-11-06 23:29:03 UTC (rev 91) @@ -35,8 +35,12 @@ CONNECTOR_VERSION = 0.1 CONNECTOR_LOGNAME = 'console_server' - def __init__(self,WireGateInstance, instanceName): - self.WG = WireGateInstance + def __init__(self,parent, instanceName): + self._parent = parent + if parent: + self.WG = parent.WG + else: + self.WG = False self.instanceName = instanceName defaultconfig = { 'port' : 4401 Modified: PyWireGate/trunk/cycler.py =================================================================== --- PyWireGate/trunk/cycler.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/cycler.py 2010-11-06 23:29:03 UTC (rev 91) @@ -20,101 +20,146 @@ import time -class cycler: - def __init__(self,WireGateInstance): - self.WG = WireGateInstance - +class TaskRunner: + def __init__(self,parent): + self._parent = parent + if parent: + self.WG = parent.WG + else: + self.WG = False self.isrunning = True - ## Dummy Timer + self.waiting = threading.Timer(0,lambda: None) - self.waiting.setDaemon(1) + + self.mutex = threading.RLock() + + ## function for time based sortingg of tasks self.getTime = lambda x: x.getTime - self.timerList = [] + + ## all not running tasks goes here + self.taskList = [] + + ## all started tasks goes here self.running = {} def debug(self,msg): print "DEBUG Cycler: %s" % msg - def remove(self, obj): + + def remove(self, task): if not self.isrunning: - ## dont try to get a mutex + + ## dont try to get a mutex on shutdown return False try: self.mutex.acquire() - if obj in self.timerList: + if task in self.taskList: try: - self.timerList.remove(obj) + self.taskList.remove(task) except: pass - self.debug("Removed %r" % obj.action) - if len(self.timerList) == 0: + self.debug("Removed %r" % task.action) + if len(self.taskList) == 0: ## kill waiting timer self.debug("Cancel GLobal wait") self.waiting.cancel() - if obj in self.running: + + if task in self.running: try: - if self.running[obj].isAlive(): - self.running[obj].cancel() - self.debug("Canceled %r" % obj.args) + if self.running[task].isAlive(): + self.running[task].cancel() + self.debug("Canceled %r" % task.args) finally: - self.debug("terminated %r" % obj.args) - del self.running[obj] + self.debug("terminated %r" % task.args) + del self.running[task] finally: self.mutex.release() + + + def event(self,rtime,function,*args,**kwargs): + if not self.isrunning: + ## dont try to get a mutex + return False + self.debug("adding event: %r (%r / %r)" % (function,args,kwargs)) + ## rtime is a date as unix timestamp + rtime = rtime - time.time() + if rtime >0: + task = Task(self,rtime,function,args=args,kwargs=kwargs) + self._Shedule(task) + return task + else: + print "expired %r (%r / %r)" % (function,args,kwargs) + def add(self,rtime,function,*args,**kwargs): if not self.isrunning: ## dont try to get a mutex return False self.debug("adding task: %r (%r / %r)" % (function,args,kwargs)) - self.addShedule(sheduleObject(self,self.WG,rtime,function,args=args,kwargs=kwargs)) + task = Task(self,rtime,function,args=args,kwargs=kwargs) + self._Shedule(task) + return task def cycle(self,rtime,function,*args,**kwargs): if not self.isrunning: ## dont try to get a mutex return False self.debug("adding cycliv task: %r (%r / %r)" % (function,args,kwargs)) - self.addShedule(sheduleObject(self,self.WG,rtime,function,cycle=True,args=args,kwargs=kwargs)) + task = Task(self,rtime,function,cycle=True,args=args,kwargs=kwargs) + self._Shedule(task) + return task - def addShedule(self,shed): - print "ADD _shed %r" % shed + def _Shedule(self,task): + print "adding task %r" % task self.mutex.acquire() - self.timerList.append(shed) + self.taskList.append(task) + ## Try to stop running timer + ## Fixme isalive try: self.waiting.cancel() except: pass - self.timerList.sort(key=self.getTime) + + self.taskList.sort(key=self.getTime) self.mutex.release() ## check if any Timer need activation self._check() - return shed - - + def _check(self): self.debug("Cycle") try: self.mutex.acquire() + print "acitve tasks %d " % threading.activeCount() + atasks = "Active tasks: " + for rtask in threading.enumerate(): + atasks += " %r" % rtask.getName() + + + print atasks ## all actions that need activation in next 60seconds - for shedobj in filter(lambda x: x.getTime() < 60, self.timerList): + for task in filter(lambda x: x.getTime() < 60, self.taskList): try: - self.running[shedobj] = threading.Timer(shedobj.getTime(),shedobj.run) - self.running[shedobj].start() - #print "run %s" % t.name + exectime = task.getTime() + name = "event" + if task.cycle: + name = "cycle" + self.running[task] = threading.Timer(exectime,task.run) + self.running[task].setName("%s_%r" % (name,time.asctime(time.localtime(task.timer)))) + self.running[task].start() except: print "Failed" raise - ## remove from List because its now in the past - self.timerList.remove(shedobj) + ## remove from List because its now active + self.taskList.remove(task) finally: - if len(self.timerList) >0: - print "Wait for later timer %r" % (self.timerList[0].getTime()-5) - self.waiting = threading.Timer(self.timerList[0].getTime()-5 ,self._check) + if len(self.taskList) >0: + print "Wait for later timer %r" % (self.taskList[0].getTime()-5) + self.waiting = threading.Timer(self.taskList[0].getTime()-5 ,self._check) self.waiting.start() self.mutex.release() @@ -125,35 +170,36 @@ try: ## stop all new timer self.mutex.acquire() + self.taskList = [] + for task in self.running.keys(): + print "acitve tasks %d " % threading.activeCount() + print "cancel task %r" % task.args + try: + rtask = self.running.pop(task) + rtask.cancel() + rtask.join() + except: + pass + + print self.waiting + print "Have Mutex" - self.waiting.cancel() try: - self.waiting.join(2) + self.waiting.cancel() + self.waiting.join() except: - ## maybe not even running pass print "Thread canceld" - self.timerList = [] - for obj in self.running.keys(): - print "cancel task %r" % obj.args - try: - tobecanceled = self.running.pop(obj) - - except: - pass - tobecanceled.cancel() - tobecanceled.is - tobecanceled.join(2) - - + + except: self.debug("SHUTDOWN FAILED") -class sheduleObject: - def __init__(self,parent,WireGateInstance,rtime,function,cycle=False,args = [],kwargs={}): +class Task: + def __init__(self,parent,rtime,function,cycle=False,args = [],kwargs={}): self.Parent = parent - self.WG = WireGateInstance + self.WG = parent.WG self.delay = rtime self.cycle = cycle self._set() @@ -171,7 +217,7 @@ self.Parent.remove(self) if self.cycle: self._set() - self.Parent.addShedule(self) + self.Parent._Shedule(self) def getTime(self): @@ -181,23 +227,37 @@ if __name__ == '__main__': try: - cycle = cycler(False) + cycle = TaskRunner(False) import sys import atexit - atexit.register(cycle.shutdown) + #atexit.register(cycle.shutdown) def write_time(text=''): print "running %s: %f" % (text,time.time()) write_time('Main') - cycle.cycle(4,write_time,"Cycletask1!") - #longtask=cycle.add(80,write_time,"task2!") - #f=cycle.add(7,write_time,"task3!") + cycle1 = cycle.cycle(4,write_time,"Cycletask1!") + cycle2 = cycle.cycle(8,write_time,"Cycletask2!") + + longtask=cycle.add(80,write_time,"Longtask 80 secs!") + + f=cycle.add(7,write_time,"task3!") + + cycle.event(time.mktime((2010,11,6,21,54,00,0,0,0)),write_time,"event 21:54") time.sleep(2) - #cycle.remove(f) - #time.sleep(5) - #cycle.remove(longtask) + cycle.remove(f) + time.sleep(5) + print "##################remove longtask %r" % longtask + cycle.remove(longtask) + + #cycle.cycle(4,write_time,"Cycletask6!") + print "KILL CYCLE" + cycle.remove(cycle1) + cycle.remove(cycle2) #cycle.shutdown() #cycle.add(6,write_time,"task4!") + while threading.activeCount() > 1: + time.sleep(1) except KeyboardInterrupt: - #cycle.shutdown() + cycle.shutdown() + cycle.waiting.join(5) sys.exit(0) Modified: PyWireGate/trunk/datastore.py =================================================================== --- PyWireGate/trunk/datastore.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/datastore.py 2010-11-06 23:29:03 UTC (rev 91) @@ -15,7 +15,7 @@ """ Datastore Instance """ - def __init__(self,WireGateInstance): + def __init__(self,parent): #################################################### ## Function: __init__ ## Parameter: @@ -24,7 +24,8 @@ ## Contructor for the DATASTORE instance ## #################################################### - self.WG = WireGateInstance + self._parent = parent + self.WG = parent.WG self.log("DATASTORE starting up") self.DBLOADED = False self.dataobjects = {} @@ -82,7 +83,7 @@ type(self.dataobjects[id]) except KeyError: ## create a new one if it don't exist - self.dataobjects[id] = dataObject(self.WG,id) + self.dataobjects[id] = dataObject(self,id) ## return it self.locked.release() return self.dataobjects[id] @@ -95,7 +96,7 @@ loaddict = json.load(db) db.close() for name, obj in loaddict.items(): - self.dataobjects[name] = dataObject(self.WG,obj['id'],obj['name']) + self.dataobjects[name] = dataObject(self,obj['id'],obj['name']) self.dataobjects[name].lastupdate = obj['lastupdate'] self.dataobjects[name].config = obj['config'] self.dataobjects[name].connected = obj['connected'] @@ -154,8 +155,9 @@ class dataObject: - def __init__(self,WireGateInstance,id,name=False): - self.WG = WireGateInstance + def __init__(self,parent,id,name=False): + self._parent = parent + self.WG = parent.WG ## Threadlocking self.write_mutex = threading.RLock() Modified: PyWireGate/trunk/knx_connector/BusMonitor.py =================================================================== --- PyWireGate/trunk/knx_connector/BusMonitor.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/knx_connector/BusMonitor.py 2010-11-06 23:29:03 UTC (rev 91) @@ -20,12 +20,15 @@ import time class busmonitor: - def __init__(self, WireGateInstance,connectorInstance): - self.WG = WireGateInstance - self.KNX = connectorInstance + def __init__(self, parent): + self._parent = parent + if parent: + self.WG = parent.WG + else: + self.WG = False self.nicehex=lambda x: " ".join(map(lambda y:"%.2x" % y,x)) self.tobinstr=lambda n,b=8: "".join([str((n >> y) & 1) for y in range(b-1, -1, -1)]) - self.dpt = DPT_Types.dpt_type(WireGateInstance) + self.dpt = DPT_Types.dpt_type(self) ## FIXME: Not fully implemented self.apcicodes = { @@ -101,7 +104,7 @@ if msg['ctrl2']['DestAddrType'] == 0 and msg['apdu']['tpdu'] == "T_DATA_XXX_REQ": msg['dstaddr'] = self._decodeGrpAddr(buf[3:5]) - id = "%s:%s" % (self.KNX.instanceName, msg['dstaddr']) + id = "%s:%s" % (self._parent.instanceName, msg['dstaddr']) ## search Datastoreobject dsobj = self.WG.DATASTORE.get(id) @@ -265,10 +268,15 @@ except KeyError: pass return apci + + def log(self,msg,severity='info',instance=False): + if not instance: + instance = self.instanceName + self._parent.log(msg,severity,instance) + def debug(self,msg): - #print "DEBUG: BUSMON: "+ repr(msg) - pass + self.log("DEBUG: BUSMON: "+ repr(msg),'debug') if __name__ == "__main__": Modified: PyWireGate/trunk/knx_connector/DPT_Types.py =================================================================== --- PyWireGate/trunk/knx_connector/DPT_Types.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/knx_connector/DPT_Types.py 2010-11-06 23:29:03 UTC (rev 91) @@ -23,8 +23,12 @@ import struct class dpt_type: - def __init__(self,WireGateInstance): - self.WG = WireGateInstance + def __init__(self,parent): + self._parent = parent + if parent: + self.WG = parent.WG + else: + self.WG = False self.DECODER = { 1:self.decodeDPT1, # EIS 1/7 / 1 bit 0=Aus/1=Ein 2:self.decodeDPT2, # EIS 8 / 2 bit 0,1=Frei/2=Prio_Aus/3=Prio_Ein @@ -133,8 +137,8 @@ def log(self,msg,severity='info',instance=False): if not instance: instance = "dpt-types" - if self.WG: - self.WG.log(msg,severity,instance) + if self._parent: + self._parent.log(msg,severity,instance) def toByteArray(self,val,length): ## Set ByteArray Modified: PyWireGate/trunk/knx_connector/GroupSocket.py =================================================================== --- PyWireGate/trunk/knx_connector/GroupSocket.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/knx_connector/GroupSocket.py 2010-11-06 23:29:03 UTC (rev 91) @@ -20,12 +20,15 @@ import time class groupsocket: - def __init__(self, WireGateInstance, connectorInstance): - self.WG = WireGateInstance - self.KNX = connectorInstance + def __init__(self, parent): + self._parent = parent + if parent: + self.WG = parent.WG + else: + self.WG = False self.nicehex=lambda x: " ".join(map(lambda y:"%.2x" % y,x)) self.tobinstr=lambda n,b=8: "".join([str((n >> y) & 1) for y in range(b-1, -1, -1)]) - self.dpt = DPT_Types.dpt_type(WireGateInstance) + self.dpt = DPT_Types.dpt_type(self) def decode(self,buf,src,dst): ## Accept List Hex or Binary Data @@ -49,7 +52,7 @@ msg['srcaddr'] = self._decodePhysicalAddr(src) try: msg['dstaddr'] = self._decodeGrpAddr(dst) - id = "%s:%s" % (self.KNX.instanceName, msg['dstaddr']) + id = "%s:%s" % (self._parent.instanceName, msg['dstaddr']) if (buf[0] & 0x3 or (buf[1] & 0xC0) == 0xC0): ##FIXME: unknown APDU self.debug("unknown APDU from "+msg['srcaddr']+" to "+msg['dstaddr']+ " raw:"+buf) @@ -88,9 +91,14 @@ def _decodeGrpAddr(self,raw): return "%d/%d/%d" % ((raw >> 11) & 0x1f, (raw >> 8) & 0x07, (raw) & 0xff) + + def log(self,msg,severity='info',instance=False): + if not instance: + instance = self.instanceName + self._parent.log(msg,severity,instance) def debug(self,msg): - #print "DEBUG: GROUPSOCKET: "+ repr(msg) + self.log("DEBUG: GROUPSOCKET: "+ repr(msg),'debug') pass Modified: PyWireGate/trunk/knx_connector/KNX_Connector.py =================================================================== --- PyWireGate/trunk/knx_connector/KNX_Connector.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/knx_connector/KNX_Connector.py 2010-11-06 23:29:03 UTC (rev 91) @@ -33,17 +33,18 @@ CONNECTOR_NAME = 'KNX Connector' CONNECTOR_VERSION = 0.2 CONNECTOR_LOGNAME = 'knx_connector' - def __init__(self,WireGateInstance, instanceName): - self.WG = WireGateInstance + def __init__(self,parent, instanceName): + self._parent = parent + self.WG = parent.WG self.instanceName = instanceName self.KNX = EIBConnection.EIBConnection() self.KNXBuffer = EIBConnection.EIBBuffer() self.KNXSrc = EIBConnection.EIBAddr() self.KNXDst = EIBConnection.EIBAddr() - self.busmon = BusMonitor.busmonitor(WireGateInstance,self) - self.groupsocket = GroupSocket.groupsocket(WireGateInstance,self) - self.dpt = DPT_Types.dpt_type(WireGateInstance) + self.busmon = BusMonitor.busmonitor(self) + self.groupsocket = GroupSocket.groupsocket(self) + self.dpt = DPT_Types.dpt_type(self) self.GrpAddrRegex = re.compile(r"(?:|(\d+)\x2F)(\d+)\x2F(\d+)$",re.MULTILINE) Modified: PyWireGate/trunk/owfs_connector/OWFS_Connector.py =================================================================== --- PyWireGate/trunk/owfs_connector/OWFS_Connector.py 2010-11-06 22:51:09 UTC (rev 90) +++ PyWireGate/trunk/owfs_connector/OWFS_Connector.py 2010-11-06 23:29:03 UTC (rev 91) @@ -26,8 +26,12 @@ CONNECTOR_NAME = 'OWFS Connector' CONNECTOR_VERSION = 0.1 CONNECTOR_LOGNAME = 'owfs_connector' - def __init__(self,WireGateInstance, instanceName): - self.WG = WireGateInstance + def __init__(self,parent, instanceName): + self._parent = parent + if parent: + self.WG = parent.WG + else: + self.WG = False self.instanceName = instanceName defaultconfig = { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |