[Gug-cvs] gug/gug/module/dm/resource __init__.py, NONE, 1.1 abstract.py, NONE, 1.1 storage.py, NONE
Status: Planning
Brought to you by:
szferi
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:02
|
Update of /cvsroot/gug/gug/gug/module/dm/resource In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/module/dm/resource Added Files: __init__.py abstract.py storage.py computing.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM --- NEW FILE: computing.py --- ''' Computing Resource backend''' __revision__ = '$Revision: 1.1 $' from gug.common import log from gug.common.jsdl import * from gug.common.jsdl_parser import * from gug.common.share import nice_exception from gug.common.ogsa_bes import * from gug.common.simplexml import SimpleXML, SimpleXMLError from gug.client.soap import get_client from gug.module.dm.resource.abstract import AbstractResource import time import urlparse class ComputingResource(AbstractResource): exact_test = ['operatingsystem', 'architecture'] number_test = ['totalcpucount', 'totalphysicalmemory', 'totalvirtualmemory', 'checkpoint', \ 'exclusiveexecution'] def _query_resources(self, resource_list, type): """ Get Computing Element resources from local GIS """ try: gis_list = self.gis_proxy.get({'resourcetype' : type}) except Exception, error: log.error() for res in gis_list: meta_data = res[0] id = meta_data['sid'] meta_data['timestamp'] = time.time() if resource_list.has_key(id) and \ int(resource_list[id][0]['timelife']) >= int(meta_data['timelife']): print 'resource (%s) timelife (%s) is smaller than in the cache (%s)' % \ (id, meta_data['timelife'], resource_list[id][0]['timelife']) continue data = res[1] try: service_desc = SimpleXML(memory = data) path = '/ServiceDescription/' + type ce_data = service_desc.get_dictionaries(path)[0] if not resource_list.has_key(id) or not resource_list[id][1]: log.msg('Computing Element data arrived from %s.' % id) except SimpleXMLError, error: log.error('No Computing Element data from %s.' % id) continue try: path = '/ServiceDescription/Capabilities' cap_data = service_desc.get_dictionaries(path)[0] except SimpleXMLError, error: cap_data = {} resource_list[id] = (meta_data, ce_data, cap_data) print 'available resources:', ', '.join(resource_list.keys()) '''Check the resource that it meets the job's requirements''' def _test_candidate(self, job_desc, res_desc): if job_desc.has_key('candidatehosts'): hosts = [url for url in res_desc['urls'] if urlparse.urlparse(url)[1].split(':')[0] in job_desc['candidatehosts']] if len(hosts) == 0: print 'candidate (%s) failed because of: CandidateHosts:' \ % res_desc['name'], job_desc['candidatehosts'] return False for i in self.exact_test: if job_desc.has_key(i) and res_desc.has_key(i): if job_desc[i].upper() != '*' and job_desc[i].upper() != res_desc[i].upper(): print 'candidate (%s) failed because of:' % res_desc['name'], i return False for i in self.number_test: if job_desc.has_key(i) and res_desc.has_key(i): if float(job_desc[i]) > float(res_desc[i]): print 'candidate (%s) failed because of:' % res_desc['name'], i return False return True class ClusterManager(ComputingResource): def change_states(self, ids): jobs = ids.select({'status' : BES_OVERALL_STATE_UNKNOWN }) for job in jobs: if time.time() - job['last_check_time'] >= self.config['TimeOut']: log.msg('TIMEOUT! Job reschedule: %s' % job['grid_id']) job['status'] = BES_OVERALL_STATE_NEW ids.commit(job['grid_id']) def query_resources(self): self._query_resources(self.dm.resource_list, 'ClusterElement') for res in self.dm.resource_list.keys(): if self.dm.resource_list[res][1]['OperatingSystem'].upper() != self.config['OperatingSystem'].upper() or\ self.dm.resource_list[res][1]['CPUArchitecture'].upper() != self.config['Architecture'].upper(): del self.dm.resource_list[res] continue try: proxy = get_client(self.dm.resource_list[res][0]['urls'][0]) self.dm.resource_list[res][1]['FreeJobPlace'] = proxy.free_job_place() except Exception, error: 'cannot get exec\'s (%s) freejobplace:' % res, nice_exception(error) def get_scheduling_items(self, ids): return ids.select({ 'status' : BES_OVERALL_STATE_NEW }) def generate_candidate_set(self, job): job_desc = job['desc'] for res in self.dm.resource_list.keys(): res_desc = self._create_desc(self.dm.resource_list[res]) if not self.dm.blacklisted(res) and self._test_candidate(job_desc, res_desc) and\ self.dm.resource_list[res][1].has_key('FreeJobPlace') and\ self.dm.resource_list[res][1]['FreeJobPlace'] > 0: yield res return def get_next_candidate(self, candidate, id): actual = None try: while True: actual = candidate.next() self.dm.resource_list[actual][1]['FreeJobPlace'] = \ self.dm.resource_list[actual][1]['FreeJobPlace'] - 1 yield actual self.dm.resource_list[actual][1]['FreeJobPlace'] = \ self.dm.resource_list[actual][1]['FreeJobPlace'] + 1 except: pass return def _create_desc(self, res): desc = {} desc['name'] = res[0]['sid'] desc['urls'] = res[0]['urls'] desc['operatingsystem'] = res[1]['OperatingSystem'] desc['architecture'] = res[1]['CPUArchitecture'] desc['totalcpucount'] = res[1]['CPUCount'] desc['totalphysicalmemory'] = res[1]['FreePhysicalMemory'] desc['totalvirtualmemory'] = res[1]['FreeVirtualMemory'] desc['checkpoint'] = res[1]['CheckPoint'] if int(res[1]['ProcessPerCPU']) == 1: desc['exclusiveexecution'] = 1 else: desc['exclusiveexecution'] = 0 return desc class SuperScheduler(ComputingResource): def query_resources(self): self._query_resources(self.dm.resource_list, 'ComputingElement') def generate_candidate_set(self, job): job_desc = create_desc_from_jsdl(job['jsdl']) for res in self.dm.resource_list.keys(): res_desc = self._create_desc(self.dm.resource_list[res]) if not self.dm.blacklisted(res) and self._test_candidate(job_desc, res_desc) and\ self.dm.resource_list[res][1].has_key('FreeCpuNum') and\ (int(self.dm.resource_list[res][1]['FreeCpuNum']) - int(self.dm.resource_list[res][1]['NewJobNumber'])) > 0: yield res return def get_scheduling_items(self, ids): return ids.select({ 'state' : BES_OVERALL_STATE_NEW }) def get_next_candidate(self, candidate, id): actual = None try: while True: actual = candidate.next() self.dm.resource_list[actual][1]['FreeCpuNum'] = \ str(int(self.dm.resource_list[actual][1]['FreeCpuNum']) - 1) yield actual self.dm.resource_list[actual][1]['FreeCpuNum'] = \ str(int(self.dm.resource_list[actual][1]['FreeCpuNum']) + 1) except: pass #if actual: # self.dm.resource_list[actual][0]['FreeCpuNum'] = \ # str(int(self.dm.resource_list[actual][0]['FreeCpuNum']) + 1) return def get_resource_filter(self, id): return id['jsdl'] def _create_desc(self, res): desc = {} desc['name'] = res[0]['sid'] desc['urls'] = res[0]['urls'] desc['operatingsystem'] = res[1]['OperatingSystem'] desc['architecture'] = res[1]['Architecture'] desc['totalcpucount'] = res[1]['CpuNum'] return desc --- NEW FILE: abstract.py --- ''' Abtrac Resource backend interface''' __revision__ = '$Revision: 1.1 $' from gug.common.idstore import AbstractQueue class AbstractResource: '''DM resource backend's base Class''' def __init__(self, dm, gis_proxy, config): self.gis_proxy = gis_proxy self.dm = dm self.config = config '''It will change the queue's elements states if needed''' def change_states(self, queue): pass '''Queries resources from local GIS''' def query_resources(self): pass '''Returns lost if those ids, which should be scheduled''' def get_scheduling_items(self, queue): return queue.get_all() '''Returns a candidate set generator object, that will iterates the good resources that meets the queue id's requirements''' def generate_candidate_set(self, id): pass '''Returns a generator object, which will get the next candidate and change there's attributes if needed''' def get_next_candidate(self, candidate, id): pass def get_resource_filter(self, id): return None --- NEW FILE: storage.py --- ''' Storage Resource backend''' __revision__ = '$Revision: 1.1 $' from gug.common import log from gug.common.share import nice_exception from gug.common.simplexml import SimpleXML, SimpleXMLError from gug.client.soap import get_client from gug.module.dm.resource.abstract import AbstractResource import time class StorageManager(AbstractResource): def change_states(self, queue): pass def query_resources(self): """ Get Storage Element resources from local GIS """ try: gis_list = self.gis_proxy.get({'resourcetype' : 'StorageElement'}) except Exception, error: log.error() for res in gis_list: meta_data = res[0] id = meta_data['sid'] meta_data['timestamp'] = time.time() if self.dm.resource_list.has_key(id) and \ int(self.dm.resource_list[id][0]['timelife']) >= int(meta_data['timelife']): print 'resource (%s) timelife (%s) is smaller than in the cache (%s)' % \ (id, meta_data['timelife'], self.dm.resource_list[id][0]['timelife']) continue data = res[1] try: service_desc = SimpleXML(memory = data) path = '/ServiceDescription/StorageElement' se_data = service_desc.get_dictionaries(path)[0] if not self.dm.resource_list.has_key(id) or not self.dm.resource_list[id][1]: log.msg('Storage Element data arrived from %s.' % id) except SimpleXMLError, error: se_data = {} log.error('No Storage Element data from %s.' % id) self.dm.resource_list[id] = (meta_data, se_data) print 'available resources:', ', '.join(self.dm.resource_list.keys()) def get_scheduling_items(self, queue): return queue.get_all() def generate_candidate_set(self, id): for res in self.dm.resource_list.keys(): if not self.dm.blacklisted(res) and \ int(self.dm.resource_list[res][1]['DataStorageFree']) > id: yield res return def get_next_candidate(self, candidate, id): actual = None try: while True: actual = candidate.next() self.dm.resource_list[actual][1]['DataStorageFree'] = \ str(int(self.dm.resource_list[actual][1]['DataStorageFree']) - id) yield actual self.dm.resource_list[actual][1]['DataStorageFree'] = \ str(int(self.dm.resource_list[actual][1]['DataStorageFree']) + id) except: return --- NEW FILE: __init__.py --- '''Resource Interface backends for DM''' __revision__ = '$Revision: 1.1 $' |