From: Shuang Wu <shu...@gm...> - 2011-09-29 04:41:15
|
Hi, guys, I am writing a ganglia python module to Flume metrics to ganglia. It's pretty much done, running the python script on command line gives all the metrics, however, when I enabled it in ganglia, only some of the metrics was sent, "telnet localhost 8649" confirms that only some of the metrics are known to ganglia. I have been trying to guess what the problem is, but with no luck. Here is my code, please let me know any suggestion you might have: flume_stats.py #!/usr/bin/env python # -*- coding: utf-8 -*- import os import threading import time import traceback import urllib2 import json descriptors = list() JVM_Metrics = { 'mem.heap.init' : 'init heap size', 'mem.heap.max' : 'max heap size', 'mem.heap.used' : 'used heap size', } Node_Metrics = { 'sink.LazyOpenDecorator.BackoffFailover.failsPrimary' : 'sink primary failed events', 'sink.LazyOpenDecorator.BackoffFailover.sentBackups' : 'sink backup sent events', 'sink.LazyOpenDecorator.BackoffFailover.sentPrimary' : 'sink primary sent events', 'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendRequests' : 'requsted events', 'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendRetries' : 'retried events', 'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendSuccessses' : 'succeeded events', 'source.LazyOpenSource.ThriftEventSource.number of bytes' : 'source thrift bytesIn', 'source.LazyOpenSource.ThriftEventSource.number of events' : 'source thrift eventsIn', 'source.LazyOpenSource.TailSource.number of bytes' : 'source tail bytesIn', 'source.LazyOpenSource.TailSource.number of events' : 'source tail eventsIn', } Short_Metric_Name = { 'sink.LazyOpenDecorator.BackoffFailover.failsPrimary' : 'snk.pri.fails', 'sink.LazyOpenDecorator.BackoffFailover.sentBackups' : 'snk.bkup.sent', 'sink.LazyOpenDecorator.BackoffFailover.sentPrimary' : 'snk.pri.sent', 'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendRequests' : 'snk.rqst.evt', 'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendRetries' : 'snk.rtr.evt', 'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendSuccessses' : 'snk.sus.evt', 'source.LazyOpenSource.ThriftEventSource.number of bytes' : 'src.thrift.bytes', 'source.LazyOpenSource.ThriftEventSource.number of events' : 'src.thrift.evt', 'source.LazyOpenSource.TailSource.number of bytes' : 'src.tail.bytes', 'source.LazyOpenSource.TailSource.number of events' : 'src.tail.evt', } _worker_thread = None _lock = threading.Lock() class UpdateFlumeMetricThread(threading.Thread): '''update Flume metrics''' def __init__(self, params): threading.Thread.__init__(self) self.running = False self.exiting = False self.url = params["url"] self.refresh_rate = int(params["refresh_rate"]) self.metric = {} def exit(self): self.exiting = True if not self.running: return self.join() def run(self): self.running = True while not self.exiting: _lock.acquire() self.update_metric() _lock.release() time.sleep(self.refresh_rate) self.running = False def update_metric(self): try: req = urllib2.Request(url = self.url) res = urllib2.urlopen(req) j = json.loads(res.read()) for k in JVM_Metrics.keys(): self.metric['jvm.' + k] = j['jvmInfo'][k] for l_nodes in j['logicalnodes'].keys(): req = urllib2.Request(url = self.url + "/" + l_nodes) res = urllib2.urlopen(req) j_node = json.loads(res.read()) for k_node in Node_Metrics.keys(): if k_node in j_node.keys(): self.metric['node.' + l_nodes[:3] + '.' + Short_Metric_Name[k_node]] = j_node[k_node] except urllib2.URLError: traceback.print_exc() else: res.close() def get_metric(self, name): val = 0 if name in self.metric: _lock.acquire() val = self.metric[name] _lock.release() return val def get_metric(name): return _worker_thread.get_metric(name) def metric_init(params): global descriptors, _worker_thread print '[flume_stats] Received the following parameters' print params if "metric_group" not in params: params["metric_group"] = "flume" if "refresh_rate" not in params: params["refresh_rate"] = 30 if "url" not in params: params["url"] = "http://localhost:35862/node/reports" _worker_thread = UpdateFlumeMetricThread(params) _worker_thread.start() for (k,v) in JVM_Metrics.iteritems(): descriptors.append({ "name" : 'jvm.' + k, "call_back" : get_metric, "time_max" : 60, "value_type" : "uint", "units" : "bytes", "format" : "%u", "description" : v, "groups" : params["metric_group"], }) req = urllib2.Request(params["url"]) res = urllib2.urlopen(req) j = json.loads(res.read()) for l_nodes in j['logicalnodes'].keys(): req = urllib2.Request(params["url"] + "/" + l_nodes) res = urllib2.urlopen(req) j_node = json.loads(res.read()) for k_node in Node_Metrics.keys(): if k_node in j_node: descriptors.append({ "name" : 'node.' + l_nodes[:3] + '.' + Short_Metric_Name[k_node], "call_back" : get_metric, "time_max" : 60, "value_type" : "uint", "units" : "bytes_events", "format" : "%u", "description" : Node_Metrics[k_node], "groups" : params["metric_group"], }) return descriptors def metric_cleanup(): '''Clean up the metric module.''' _worker_thread.exit() #This code is for debugging and unit testing if __name__ == '__main__': try: params = { 'url': 'http://localhost:35862/node/reports', } metric_init(params) while True: for d in descriptors: v = d['call_back'](d['name']) print 'value for %s is %u' % (d['name'], v) time.sleep(30) except KeyboardInterrupt: time.sleep(0.2) os._exit(1) and flume_stats.pyconf: modules { module { name = "flume_stats" language = "python" param url { value = "http://localhost:35862/node/reports" } param refresh_rate { value = 30 } param metric_group { value = "flume" } } } collection_group { collect_every = 30 time_threshold = 60 metric { name_match = "node\.([a-z\.]+)" title = "Flume logicalnode \\1" value_threshold = 0 } metric { name_match = "jvm\.([a-z\.]+)" title = "Flume jvm \\1" value_threshold = 0 } } the output of running flume_stats.py: [flume_stats] Received the following parameters {'url': 'http://localhost:35862/node/reports'} value for jvm.mem.heap.init is 114117632 value for jvm.mem.heap.used is 362908560 value for jvm.mem.heap.max is 1623719936 value for node.apa.src.tail.bytes is 82739062 value for node.apa.src.tail.evt is 387330 value for node.apa.snk.bkup.sent is 1 value for node.apa.snk.pri.fails is 1 value for node.apa.snk.pri.sent is 387329 value for node.jan.src.thrift.evt is 554692327 value for node.jan.snk.bkup.sent is 0 value for node.jan.src.thrift.bytes is 634427896119 value for node.jan.snk.pri.fails is 0 value for node.jan.snk.pri.sent is 554692327 value for jvm.mem.heap.init is 114117632 value for jvm.mem.heap.used is 362908560 value for jvm.mem.heap.max is 1623719936 value for node.apa.src.tail.bytes is 82739062 value for node.apa.src.tail.evt is 387330 value for node.apa.snk.bkup.sent is 1 value for node.apa.snk.pri.fails is 1 value for node.apa.snk.pri.sent is 387329 value for node.jan.src.thrift.evt is 554692327 value for node.jan.snk.bkup.sent is 0 value for node.jan.src.thrift.bytes is 634427896119 value for node.jan.snk.pri.fails is 0 value for node.jan.snk.pri.sent is 554692327 ... Shuang |