From: <bni...@us...> - 2008-05-24 00:30:16
|
Revision: 1364 http://ganglia.svn.sourceforge.net/ganglia/?rev=1364&view=rev Author: bnicholes Date: 2008-05-23 17:30:23 -0700 (Fri, 23 May 2008) Log Message: ----------- Detect when the cluster is down and so that the summary data can be fixed up appropriately. This includes a new tag call STATUS which could be used by the web frontend to determine when the cluster is unavailable. Also calculate the time offset TN between the reported time and the reporting timestamps Modified Paths: -------------- trunk/monitor-core/gmetad-python/gmetad_data.py trunk/monitor-core/gmetad-python/gmetad_gmondReader.py Modified: trunk/monitor-core/gmetad-python/gmetad_data.py =================================================================== --- trunk/monitor-core/gmetad-python/gmetad_data.py 2008-05-24 00:28:07 UTC (rev 1363) +++ trunk/monitor-core/gmetad-python/gmetad_data.py 2008-05-24 00:30:23 UTC (rev 1364) @@ -92,13 +92,27 @@ clusterNode.summaryData['summary'] = {} clusterNode.summaryData['hosts_up'] = 0 clusterNode.summaryData['hosts_down'] = 0 - + clusterUp = clusterNode.status == 'up' + summaryTime = int(time.time()) + # Summarize over each host contained by the cluster for hostNode in clusterNode: + reportedTime = summaryTime # Sum up the status of all of the hosts if 'HOST' == hostNode.id: + # Calculate the difference between the last known reported time + # and the current time. This determines if the host is up or down + # ** There may still be some issues with the way that this calculation is done + # ** The metric node below may also have the same issues. + reportedTime = int(hostNode.reported) + tn = int(hostNode.tn) + if tn > int(hostNode.tmax): + tn = int(hostNode.tmax) + tn = (summaryTime - tn) - reportedTime + if tn < 0: tn = 0 + hostNode.tn = str(tn) try: - if int(hostNode.tn) < int(hostNode.tmax)*4: + if clusterUp and (int(hostNode.tn) < int(hostNode.tmax)*4): clusterNode.summaryData['hosts_up'] += 1 else: clusterNode.summaryData['hosts_down'] += 1 @@ -108,6 +122,12 @@ pass # Summarize over each metric within a host for metricNode in hostNode: + tn = int(metricNode.tn) + if tn > int(metricNode.tmax): + tn = int(metricNode.tmax) + tn = (summaryTime - tn) - reportedTime + if tn < 0: tn = 0 + metricNode.tn = str(tn) # Don't include metrics that can not be summarized if metricNode.type in ['string', 'timestamp']: continue @@ -178,14 +198,14 @@ pass return node - def updateFinished(self, clusterPath=[]): + def updateFinished(self, clusterNode): ''' This method is called when the gmond reader has finished updating a cluster. It indicates that a summary can be done over the entire cluster and than the cluster transaction needs to be entered and passed to the plugins. ''' - clusterNode = self.getNode(clusterPath) - self._doSummary(clusterNode); - self.notifier.insertTransaction(clusterNode) + if clusterNode is not None: + self._doSummary(clusterNode); + self.notifier.insertTransaction(clusterNode) def acquireLock(self, obj): ''' Acquire a data store lock. ''' Modified: trunk/monitor-core/gmetad-python/gmetad_gmondReader.py =================================================================== --- trunk/monitor-core/gmetad-python/gmetad_gmondReader.py 2008-05-24 00:28:07 UTC (rev 1363) +++ trunk/monitor-core/gmetad-python/gmetad_gmondReader.py 2008-05-24 00:30:23 UTC (rev 1364) @@ -101,16 +101,18 @@ return (hostinfo[0], port) def run(self): + ds = DataStore() while not self._shuttingDown: + connected = False # Create a socket and connect to the cluster data source. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect( self._getEndpoint(self.dataSource.hosts[self.lastKnownGoodHost]) ) + connected = True except socket.error: # Keep track of the last good data source within the cluster. If we can't reconnect to the # same data source, try the next one in the list. curidx = self.lastKnownGoodHost - connected=False while True: curidx += 1 if curidx >= len(self.dataSource.hosts): @@ -123,25 +125,47 @@ break except socket.error: pass - if not connected: - logging.error('Could not connect to any host for data source %s' % self.dataSource.name) - return - logging.info('Quering data source %s via host %s' % (self.dataSource.name, self.dataSource.hosts[self.lastKnownGoodHost])) - xmlbuf = '' - while True: - # Read all of the XML data from the data source. - buf = sock.recv(8192) - if not buf: - break - xmlbuf += buf - sock.close() + if connected: + logging.info('Quering data source %s via host %s' % (self.dataSource.name, self.dataSource.hosts[self.lastKnownGoodHost])) + xmlbuf = '' + while True: + # Read all of the XML data from the data source. + buf = sock.recv(8192) + if not buf: + break + xmlbuf += buf + sock.close() + # Create an XML parser and parse the buffer + gch = GmondContentHandler() + xml.sax.parseString(xmlbuf, gch) + # Notify the data store that all updates for the cluster are finished. + clusterNode = ds.getNode(gch.getClusterAncestry()) + try: + clusterNode.status = 'up' + except Exception: + pass + else: + logging.error('Could not connect to any host for data source %s' % self.dataSource.name) + ds = DataStore() + cfg = getConfig() + gridKey = Element.generateKey(['GRID',cfg[GmetadConfig.GRIDNAME]]) + clusterKey = Element.generateKey(['CLUSTER', self.dataSource.name]) + gridNode = ds.getNode([str(ds.rootElement), gridKey]) + clusterNode = None + if gridNode is not None and str(gridNode) == gridKey: + try: + clusterNode = gridNode[clusterKey] + except KeyError: + clusterNode = Element('CLUSTER', {'NAME':self.dataSource.name, 'LOCALTIME':'%d' % time.time()}) + ds.setNode(clusterNode, gridNode) + if clusterNode is not None: + clusterNode.status = 'down' + #clusterNode.localtime = time.time() + + ds.updateFinished(clusterNode) + if self._shuttingDown: break - # Create an XML parser and parse the buffer - gch = GmondContentHandler() - xml.sax.parseString(xmlbuf, gch) - # Notify the data store that all updates for the cluster are finished. - DataStore().updateFinished(gch.getClusterAncestry()) # Go to sleep for a while. self._cond.acquire() self._cond.wait(getRandomInterval(self.dataSource.interval)) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |