From: <bni...@us...> - 2008-05-16 01:09:16
|
Revision: 1353 http://ganglia.svn.sourceforge.net/ganglia/?rev=1353&view=rev Author: bnicholes Date: 2008-05-15 18:09:19 -0700 (Thu, 15 May 2008) Log Message: ----------- Allow the datastore class to do all of the summaries for each cluster as well as the grid. Switch the xmlWriter and rrd_summary plugin to use the summary data rather than doing their own summaries. Modified Paths: -------------- trunk/monitor-core/gmetad-python/gmetad_config.py.in trunk/monitor-core/gmetad-python/gmetad_data.py trunk/monitor-core/gmetad-python/gmetad_element.py trunk/monitor-core/gmetad-python/gmetad_gmondReader.py trunk/monitor-core/gmetad-python/gmetad_xmlWriter.py trunk/monitor-core/gmetad-python/plugins/rrd_plugin.py trunk/monitor-core/gmetad-python/plugins/rrd_summary_plugin.py Modified: trunk/monitor-core/gmetad-python/gmetad_config.py.in =================================================================== --- trunk/monitor-core/gmetad-python/gmetad_config.py.in 2008-05-15 18:02:21 UTC (rev 1352) +++ trunk/monitor-core/gmetad-python/gmetad_config.py.in 2008-05-16 01:09:19 UTC (rev 1353) @@ -63,7 +63,6 @@ XML_PORT = 'xml_port' INTERACTIVE_PORT = 'interactive_port' SERVER_THREADS = 'server_threads' - RRD_ROOTDIR = 'rrd_rootdir' VERSION = '@GANGLIA_VERSION@' PLUGINS_DIR = 'plugins_dir' @@ -72,13 +71,6 @@ LOGFILE : None, PIDFILE : None, DATA_SOURCE : [], - #RRAS : [ - # GmetadRRA('AVERAGE:0.5:1:244'), - # GmetadRRA('AVERAGE:0.5:24:244'), - # GmetadRRA('AVERAGE:0.5:168:244'), - # GmetadRRA('AVERAGE:0.5:672:244'), - # GmetadRRA('AVERAGE:0.5:5760:374') - #], SCALABLE : True, GRIDNAME : 'unspecified', AUTHORITY : 'http://%s/ganglia/' % getfqdn(), @@ -106,7 +98,6 @@ GmetadConfig.DEBUG_LEVEL : self.parseDbgLevel, GmetadConfig.LOGFILE : self.parseLogfile, GmetadConfig.DATA_SOURCE : self.parseDataSource, - # GmetadConfig.RRAS : self.parseRRAs, GmetadConfig.SCALABLE : self.parseScalable, GmetadConfig.GRIDNAME : self.parseGridname, GmetadConfig.AUTHORITY : self.parseAuthority, @@ -117,7 +108,6 @@ GmetadConfig.XML_PORT : self.parseXmlPort, GmetadConfig.INTERACTIVE_PORT : self.parseInteractivePort, GmetadConfig.SERVER_THREADS : self.parseServerThreads, - # GmetadConfig.RRD_ROOTDIR : self.parseRrdRootdir, GmetadConfig.PLUGINS_DIR : self.parsePluginsDir } self.updateConfig() @@ -146,7 +136,6 @@ try: kw, args = line.strip().split(None,1) except ValueError: - print 'Invalid syntax: %s' % line kw = line pass break @@ -174,22 +163,24 @@ def getSection(self, id): ret = None + secID = id.lower() try: - ret = self.sections[id] + ret = self.sections[secID] except KeyError: pass return ret def _setSection(self, id, f): kw = '' - self.sections[id] = [] + secID = id.lower() + self.sections[secID] = [] while kw.strip() != '}': kw,args = self.GmetadReadline(f) if kw is None: break if args is None: continue - self.sections[id].append( [kw,args]) + self.sections[secID].append( [kw,args]) def resetToDefaults(self): @@ -215,11 +206,6 @@ hosts = a[0:] self.cfg[GmetadConfig.DATA_SOURCE].append(GmetadDataSource(name, hosts, interval)) - #def parseRRAs(self, args): - # self.cfg[GmetadConfig.RRAS] = [] - # for rraspec in args: - # self.cfg[GmetadConfig.RRAS].append(GmetadRRA(rraspec.strip().strip('"').split(':',1)[1])) - def parseScalable(self, arg): v = arg.strip().lower() if v == 'off' or v == 'false' or v == 'no': @@ -269,11 +255,6 @@ if v.isdigit(): self.cfg[GmetadConfig.SERVER_THREADS] = int(v) - #def parseRrdRootdir(self, arg): - # v = arg.strip().strip('"') - # if os.path.isdir(v): - # self.cfg[GmetadConfig.RRD_ROOTDIR] = v - def parsePluginsDir(self, arg): v = arg.strip().strip('"') if os.path.isdir(v): Modified: trunk/monitor-core/gmetad-python/gmetad_data.py =================================================================== --- trunk/monitor-core/gmetad-python/gmetad_data.py 2008-05-15 18:02:21 UTC (rev 1352) +++ trunk/monitor-core/gmetad-python/gmetad_data.py 2008-05-16 01:09:19 UTC (rev 1353) @@ -31,14 +31,14 @@ #* Brad Nicholes (bnicholes novell.com) #******************************************************************************/ -import thread +import thread, threading import logging import time -import copy from gmetad_element import Element from gmetad_config import getConfig, GmetadConfig from gmetad_notifier import GmetadNotifier +from gmetad_random import getRandomInterval class DataStore: _shared_state = {} @@ -60,28 +60,49 @@ self.setNode(Element('GRID', {'NAME':cfg[GmetadConfig.GRIDNAME], 'AUTHORITY':cfg[GmetadConfig.AUTHORITY], 'LOCALTIME':'%d' % time.time()}), self.rootElement) self.lock.release() + self.gridSummary = DataStoreGridSummary() + self.gridSummary.start() self.notifier = GmetadNotifier() self.notifier.start() DataStore._initialized = True - + def _doSummary(self, clusterNode): - clusterNode.summary = {} + self.acquireLock(self) + clusterNode.summaryData = {} + clusterNode.summaryData['summary'] = {} + clusterNode.summaryData['hosts_up'] = 0 + clusterNode.summaryData['hosts_down'] = 0 for hostNode in clusterNode: + # Sum up the status of all of the hosts + if 'HOST' == hostNode.id: + try: + if int(hostNode.tn) < int(hostNode.tmax)*4: + clusterNode.summaryData['hosts_up'] += 1 + else: + clusterNode.summaryData['hosts_down'] += 1 + except AttributeError: + pass + except KeyError: + pass for metricNode in hostNode: #if metricNode.type in ['string', 'timestamp'] or metricNode.slope == 'zero': if metricNode.type in ['string', 'timestamp']: continue try: - summaryNode = clusterNode.summary[str(metricNode)] - summaryNode.val = float(summaryNode.val) + float(metricNode.val) + summaryNode = clusterNode.summaryData['summary'][str(metricNode)] + summaryNode.sum += float(metricNode.val) except KeyError: - summaryNode = copy.copy(metricNode) - clusterNode.summary[str(summaryNode)] = summaryNode + summaryNode = metricNode.summaryCopy('METRICS') + summaryNode.sum = float(metricNode.val) + summaryNode.type = 'double' + clusterNode.summaryData['summary'][str(summaryNode)] = summaryNode summaryNode.num = 0 summaryNode.num += 1 + self.releaseLock(self) def shutdown(self): self.notifier.shutdown() + self.gridSummary.shutdown() def setNode(self, node, parent=None): if parent is None: @@ -89,6 +110,15 @@ self.rootElement = node self.rootElement.source = 'gmetad' return self.rootElement + if str(node) in parent.children: + try: + node.children = parent[str(node)].children + except AttributeError: + pass + try: + node.summaryData = parent[str(node)].summaryData + except AttributeError: + pass parent[str(node)] = node return parent[str(node)] @@ -114,3 +144,73 @@ self._doSummary(clusterNode); self.notifier.insertTransaction(clusterNode) + def acquireLock(self, obj): + self.lock.acquire() + logging.debug('DataStore lock acquired %s'%str(obj)) + + def releaseLock(self, obj): + self.lock.release() + logging.debug('DataStore lock released%s'%str(obj)) + +class DataStoreGridSummary(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + + self._cond = threading.Condition() + self._running = False + self._shuttingDown = False + + def _doGridSummary(self): + ds = DataStore() + rootNode = ds.rootElement + if rootNode is None: return + ds.acquireLock(self) + try: + for gridNode in rootNode: + gridNode.summaryData = {} + gridNode.summaryData['summary'] = {} + gridNode.summaryData['hosts_up'] = 0 + gridNode.summaryData['hosts_down'] = 0 + for clusterNode in gridNode: + # Sum up the status of all of the hosts + try: + gridNode.summaryData['hosts_up'] += clusterNode.summaryData['hosts_up'] + gridNode.summaryData['hosts_down'] += clusterNode.summaryData['hosts_down'] + except AttributeError: + pass + except KeyError: + pass + for metricNode in clusterNode.summaryData['summary'].itervalues(): + if metricNode.type in ['string', 'timestamp']: + continue + try: + summaryNode = gridNode.summaryData['summary'][str(metricNode)] + summaryNode.sum += metricNode.sum + except KeyError: + summaryNode = metricNode.summaryCopy() + gridNode.summaryData['summary'][str(summaryNode)] = summaryNode + summaryNode.num = 0 + summaryNode.num += 1 + except Exception, e: + print e + ds.releaseLock(self) + + def run(self): + if self._running: + return + self._running = True + while not self._shuttingDown: + self._cond.acquire() + # wait a random time between 10 and 30 seconds + self._cond.wait(getRandomInterval(20, 10)) + self._cond.release() + if not self._shuttingDown: + self._doGridSummary() + + def shutdown(self): + self._shuttingDown = True + self._cond.acquire() + self._cond.notifyAll() + self._cond.release() + self.join() + Modified: trunk/monitor-core/gmetad-python/gmetad_element.py =================================================================== --- trunk/monitor-core/gmetad-python/gmetad_element.py 2008-05-15 18:02:21 UTC (rev 1352) +++ trunk/monitor-core/gmetad-python/gmetad_element.py 2008-05-16 01:09:19 UTC (rev 1353) @@ -73,7 +73,7 @@ return self.children.itervalues() def __copy__(self): - cp = Element(str(self), {}) + cp = Element(self.id, {}) for k in self.__dict__.keys(): if k == 'children': cp.children = {} @@ -84,3 +84,14 @@ pass return cp + def summaryCopy(self, id=None): + if id is None: + id = self.id + cp = Element(id, {}) + for k in self.__dict__.keys(): + try: + if k.lower() in ['name', 'sum', 'num', 'type', 'units', 'slop', 'source', 'children']: + cp.__dict__[k.lower()] = copy.copy(self.__dict__[k]) + except ValueError: + pass + return cp Modified: trunk/monitor-core/gmetad-python/gmetad_gmondReader.py =================================================================== --- trunk/monitor-core/gmetad-python/gmetad_gmondReader.py 2008-05-15 18:02:21 UTC (rev 1352) +++ trunk/monitor-core/gmetad-python/gmetad_gmondReader.py 2008-05-16 01:09:19 UTC (rev 1353) @@ -53,7 +53,7 @@ ds = DataStore() e = Element(tag, attrs) if 'GANGLIA_XML' == tag: - ds.lock.acquire() + ds.acquireLock(self) self._elemStack.append(ds.getNode()) # Fetch the root node. It has already been set into the tree. self._elemStackLen += 1 cfg = getConfig() @@ -67,7 +67,7 @@ def endElement(self, tag): if tag == 'GANGLIA_XML': - DataStore().lock.release() + DataStore().releaseLock(self) self._elemStack.pop() self._elemStackLen -= 1 Modified: trunk/monitor-core/gmetad-python/gmetad_xmlWriter.py =================================================================== --- trunk/monitor-core/gmetad-python/gmetad_xmlWriter.py 2008-05-15 18:02:21 UTC (rev 1352) +++ trunk/monitor-core/gmetad-python/gmetad_xmlWriter.py 2008-05-16 01:09:19 UTC (rev 1353) @@ -106,47 +106,31 @@ #Returns a tuple of the form (hosts_up, hosts_down). hosts_up = 0 hosts_down = 0 - for c in clusternode.children.values(): - if 'HOST' == c.id: - try: - if int(c.tn) < int(c.tmax)*4: - hosts_up += 1 - else: - hosts_down += 1 - except AttributeError: - pass + if hasattr(clusternode, 'summaryData'): + hosts_up = clusternode.summaryData['hosts_up'] + hosts_down = clusternode.summaryData['hosts_down'] return (hosts_up, hosts_down) def _getGridSummary(self, gridnode, filterList, queryargs): - totalHostsUp = 0 - totalHostsDown = 0 cbuf = '' + hosts = self._getNumHostsForCluster(gridnode) + if hasattr(gridnode, 'summaryData'): + for m in gridnode.summaryData['summary'].itervalues(): + cbuf += self._getXmlImpl(m, filterList, queryargs) + rbuf = '<HOSTS UP="%d" DOWN="%d" SOURCE="gmetad" />\n%s' % (hosts[0], hosts[1], cbuf) + cbuf = '' for c in gridnode.children.values(): if 'CLUSTER' == c.id: - hosts = self._getNumHostsForCluster(c) - totalHostsUp += hosts[0] - totalHostsDown += hosts[1] - cbuf += self._getXmlImpl(c, filterList, queryargs) - rbuf = '<HOSTS UP="%d" DOWN="%d" SOURCE="gmetad" />%s\n' % (totalHostsUp, totalHostsDown, cbuf) + rbuf += self._getXmlImpl(c, filterList, queryargs) return rbuf - def _getClusterSummary(self, clusternode): - rbuf = '<HOSTS UP="%d" DOWN="%d" SOURCE="gmetad" />\n' % self._getNumHostsForCluster(clusternode) - metrics = {} - for h in clusternode.children.values(): - if 'HOST' == h.id: - for m in h.children.values(): - if 'METRIC' == m.id and 'zero' != m.slope: - if not metrics.has_key(m.name): - metrics[m.name] = {'SUM':0.0, 'NUM':1, 'TYPE':m.type, 'UNITS':'double', 'SLOPE':m.slope, 'SOURCE':m.source} - else: - metrics[m.name]['NUM'] += 1 - metrics[m.name]['SUM'] += float(m.val) - for mn, md in metrics.items(): - rbuf += '<METRICS NAME="%s"' % mn - for k, v in md.items(): - rbuf += ' %s="%s"' % (k, v) - rbuf += ' />\n' + def _getClusterSummary(self, clusternode, filterList, queryargs): + cbuf = '' + hosts = self._getNumHostsForCluster(clusternode) + if hasattr(clusternode, 'summaryData'): + for m in clusternode.summaryData['summary'].itervalues(): + cbuf += self._getXmlImpl(m, filterList, queryargs) + rbuf = '<HOSTS UP="%d" DOWN="%d" SOURCE="gmetad" />\n%s' % (hosts[0], hosts[1], cbuf) return rbuf def _getXmlImpl(self, element, filterList=None, queryargs=None): @@ -160,7 +144,7 @@ except AttributeError: pass for k,v in element.__dict__.items(): - if k == 'id' or k == 'children' or k == 'summary' or (foundName and k == 'name'): + if k == 'id' or k == 'children' or k == 'summaryData' or (foundName and k == 'name'): continue rbuf += ' %s="%s"' % (k.upper(), v) if queryargs is not None: @@ -171,7 +155,7 @@ rbuf += '>\n%s</GRID>\n' % self._getGridSummary(element, filterList, queryargs) return rbuf elif 'CLUSTER' == element.id: - rbuf += '>\n%s</CLUSTER>\n' % self._getClusterSummary(element) + rbuf += '>\n%s</CLUSTER>\n' % self._getClusterSummary(element, filterList, queryargs) return rbuf except ValueError: pass @@ -203,7 +187,7 @@ rbuf = '%s\n%s\n' % (self._xml_starttag, self._xml_dtd) ds = DataStore() if ds.rootElement is not None: - ds.lock.acquire() + ds.acquireLock(self) rbuf += self._getXmlImpl(ds.rootElement, filterList, queryargs) - ds.lock.release() + ds.releaseLock(self) return rbuf Modified: trunk/monitor-core/gmetad-python/plugins/rrd_plugin.py =================================================================== --- trunk/monitor-core/gmetad-python/plugins/rrd_plugin.py 2008-05-15 18:02:21 UTC (rev 1352) +++ trunk/monitor-core/gmetad-python/plugins/rrd_plugin.py 2008-05-16 01:09:19 UTC (rev 1353) @@ -34,6 +34,7 @@ import os import rrdtool import logging +from time import time from gmetad_plugin import GmetadPlugin from gmetad_config import getConfig, GmetadConfig @@ -115,10 +116,14 @@ logging.info('Error creating rrd %s - %s'%(rrdPath, str(e))) def _updateRRD(self, clusterNode, metricNode, rrdPath, summary): + if hasattr(clusterNode, 'localtime'): + processTime = clusterNode.localtime + else: + processTime = int(time()) if summary is True: - args = [str(rrdPath), '%s:%s:%s'%(str(clusterNode.localtime),str(metricNode.val),str(metricNode.num))] + args = [str(rrdPath), '%s:%s:%s'%(str(processTime),str(metricNode.sum),str(metricNode.num))] else: - args = [str(rrdPath), '%s:%s'%(str(clusterNode.localtime),str(metricNode.val))] + args = [str(rrdPath), '%s:%s'%(str(processTime),str(metricNode.val))] try: rrdtool.update(*args) #logging.debug('Updated rrd %s with value %s'%(rrdPath, str(metricNode.val))) Modified: trunk/monitor-core/gmetad-python/plugins/rrd_summary_plugin.py =================================================================== --- trunk/monitor-core/gmetad-python/plugins/rrd_summary_plugin.py 2008-05-15 18:02:21 UTC (rev 1352) +++ trunk/monitor-core/gmetad-python/plugins/rrd_summary_plugin.py 2008-05-16 01:09:19 UTC (rev 1353) @@ -34,23 +34,33 @@ import os import rrdtool import logging +import threading +from random import randrange + from rrd_plugin import RRDPlugin from gmetad_config import getConfig, GmetadConfig +from gmetad_data import DataStore def get_plugin(): return RRDSummaryPlugin('rrdsummary') +def getRandomInterval(midpoint, range=5): + return randrange(max(midpoint-range,0), midpoint+range) + class RRDSummaryPlugin(RRDPlugin): def __init__(self, cfgid): - RRDPlugin.__init__(self, cfgid) + RRDPlugin.__init__(self, 'RRD') def start(self): '''Called by the engine during initialization to get the plugin going.''' + self.rootSummary = RRDRootSummary() + self.rootSummary.start() print "RRDSummary start called" def stop(self): '''Called by the engine during shutdown to allow the plugin to shutdown.''' + self.rootSummary.shutdown() print "RRDSummary stop called" def notify(self, clusterNode): @@ -66,10 +76,64 @@ self._checkDir(clusterPath) clusterPath = '%s/__SummaryInfo__'%clusterPath self._checkDir(clusterPath) - for metricNode in clusterNode.summary.itervalues(): + for metricNode in clusterNode.summaryData['summary'].itervalues(): rrdPath = '%s/%s.rrd'%(clusterPath,metricNode.name) if not os.path.isfile(rrdPath): self._createRRD(clusterNode, metricNode, rrdPath, ds.interval, True) #need to do some error checking here if the createRRD failed self._updateRRD(clusterNode, metricNode, rrdPath, True) print "RRDSummary notify called" + + +class RRDRootSummary(threading.Thread, RRDPlugin): + def __init__(self): + threading.Thread.__init__(self) + RRDPlugin.__init__(self, 'RRD') + + self._cond = threading.Condition() + self._running = False + self._shuttingDown = False + + def writeRootSummary(self): + ds = DataStore() + rootNode = ds.rootElement + if rootNode is None: return + ds.acquireLock(self) + try: + gmetadConfig = getConfig() + rootPath = '%s/__SummaryInfo__'%self.cfg[RRDPlugin.RRD_ROOTDIR] + self._checkDir(rootPath) + for gridNode in rootNode: + if not hasattr(gridNode, 'summaryData'): + continue + + for metricNode in gridNode.summaryData['summary'].itervalues(): + rrdPath = '%s/%s.rrd'%(rootPath,metricNode.name) + if not os.path.isfile(rrdPath): + self._createRRD(rootNode, metricNode, rrdPath, 15, True) + #need to do some error checking here if the createRRD failed + self._updateRRD(rootNode, metricNode, rrdPath, True) + except Exception, e: + print e + ds.releaseLock(self) + print "RRDRootSummary called" + + def run(self): + if self._running: + return + self._running = True + while not self._shuttingDown: + self._cond.acquire() + # wait a random time between 10 and 30 seconds + self._cond.wait(getRandomInterval(20, 10)) + self._cond.release() + if not self._shuttingDown: + self.writeRootSummary() + + def shutdown(self): + self._shuttingDown = True + self._cond.acquire() + self._cond.notifyAll() + self._cond.release() + self.join() + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |