[Gug-cvs] gug/gug/service/sched/resource_iface job_controller.py, 1.20, 1.21
Status: Planning
Brought to you by:
szferi
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:29
|
Update of /cvsroot/gug/gug/gug/service/sched/resource_iface In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/sched/resource_iface Modified Files: job_controller.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: job_controller.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/sched/resource_iface/job_controller.py,v retrieving revision 1.20 retrieving revision 1.21 diff -C2 -d -r1.20 -r1.21 *** job_controller.py 3 Apr 2007 12:48:46 -0000 1.20 --- job_controller.py 27 Sep 2007 10:36:07 -0000 1.21 *************** *** 22,26 **** jc = id_or_jc if jc: ! meta_res = self.resource_list.get(jc.split('/')[0],None) if meta_res: try: --- 22,26 ---- jc = id_or_jc if jc: ! meta_res = self.dm.resource_list.get(jc.split('/')[0],None) if meta_res: try: *************** *** 49,59 **** metadata_node.appendChild(node) ! def __init__(self, gis_proxy, config): self.gis = gis_proxy self.config = config ! self.resource_list = {} ! self.blacklist = {} ! self.blacklist_initial_value = 10 def get_resources(self): """ Get Computing Element resources from local GIS """ --- 49,61 ---- metadata_node.appendChild(node) ! def __init__(self, gis_proxy, dm, config): self.gis = gis_proxy self.config = config ! self.dm = dm ! #self.resource_list = {} ! #self.blacklist = {} ! #self.blacklist_initial_value = 10 + #TODO: Unused method def get_resources(self): """ Get Computing Element resources from local GIS """ *************** *** 93,97 **** def submit(self, resource_id, ids, schedids): try: ! url = self.resource_list[resource_id][0]['urls'][0] except Exception, error: return False --- 95,99 ---- def submit(self, resource_id, ids, schedids): try: ! url = self.dm.resource_list[resource_id][0]['urls'][0] except Exception, error: return False *************** *** 104,110 **** if rejected: raise Exception, 'job rejected' ! if self.blacklist.get(resource_id,0) > 0: print 'jc (%s) is blacklisted (%d), cannot submit:' \ ! % (jc_name, self.blacklist[resource_id]), id['grid_id'] continue log.msg('Submitting job to %s:' % jc_name, id['grid_id']) --- 106,112 ---- if rejected: raise Exception, 'job rejected' ! if self.dm.blacklisted(resource_id): print 'jc (%s) is blacklisted (%d), cannot submit:' \ ! % (jc_name, self.dm.blacklist[resource_id]), id['grid_id'] continue log.msg('Submitting job to %s:' % jc_name, id['grid_id']) *************** *** 121,124 **** --- 123,131 ---- if id['jobtype'] == 'Compiler': id['architectures'][jc_arch][jc_os]['state'] = COMPILER_STATE_COMPILING + except (socket.error, UnknownServiceError), error: + id['error'] = nice_exception(error) + ' (jc is %s)' % jc_name + log.error('Cannot submit job: %s (%s)' % (id['grid_id'], id['error'])) + self.dm.put_blacklist(resource_id) + print 'jc (%s) is blacklisted' % jc_name except Exception, error: id['error'] = nice_exception(error) + ' (jc is %s)' % jc_name *************** *** 126,134 **** log.error('Cannot submit job: %s (%s)' % \ (id['grid_id'], id['error']) ) - if str(error).find('Connection refused') > -1 \ - or str(error).find('timed out') > -1 \ - or str(error).find('sorry, no service is running here') > -1: - self.blacklist[resource_id] = self.blacklist_initial_value - print 'jc (%s) is blacklisted' % jc_name else: print 'Cannot submit job: %s (%s)' % (id['grid_id'], id['error']) --- 133,136 ---- *************** *** 151,155 **** jcid = id['job_id'].split('/')[0] try: ! url = self.resource_list[jcid][0]['urls'][0] except: continue --- 153,157 ---- jcid = id['job_id'].split('/')[0] try: ! url = self.dm.resource_list[jcid][0]['urls'][0] except: continue *************** *** 157,163 **** jc_name = self._jc_name(jcid) #print statechangexml ! if self.blacklist.get(jcid, 0) > 0: print 'jc (%s) is blacklisted (%d), cannot send kill:' \ ! % (jc_name, self.blacklist[jcid]), id['grid_id'] else: log.msg('Asking resource (%s) to remove job (id on resource is %s):' \ --- 159,165 ---- jc_name = self._jc_name(jcid) #print statechangexml ! if self.dm.blacklisted(jcid): print 'jc (%s) is blacklisted (%d), cannot send kill:' \ ! % (jc_name, self.dm.blacklist[jcid]), id['grid_id'] else: log.msg('Asking resource (%s) to remove job (id on resource is %s):' \ *************** *** 191,197 **** def status(self, ids, schedids): - for url in self.blacklist.keys(): - if self.blacklist[url] > 0: - self.blacklist[url] = self.blacklist[url] - 1 ids_valid = [ id for id in ids if id.get('job_id', None) ] --- 193,196 ---- *************** *** 204,208 **** for jcid in jcs.keys(): try: ! url = self.resource_list[jcid][0]['urls'][0] except: log.error('Resource not found (id: %s)' % jcid) --- 203,207 ---- for jcid in jcs.keys(): try: ! url = self.dm.resource_list[jcid][0]['urls'][0] except: log.error('Resource not found (id: %s)' % jcid) *************** *** 216,223 **** jc = get_client(url) jc_name = self._jc_name(jcid) ! if self.blacklist.get(jcid,0) > 0: for id in jcs[jcid]: print 'jc (%s) is blacklisted (%d), cannot get status:' \ ! % (jc_name, self.blacklist[jcid]), id['grid_id'] id['status_long'] = None if id['state'] == BES_OVERALL_STATE_RUNNING: --- 215,222 ---- jc = get_client(url) jc_name = self._jc_name(jcid) ! if self.dm.blacklisted(jcid): for id in jcs[jcid]: print 'jc (%s) is blacklisted (%d), cannot get status:' \ ! % (jc_name, self.dm.blacklist[jcid]), id['grid_id'] id['status_long'] = None if id['state'] == BES_OVERALL_STATE_RUNNING: *************** *** 230,241 **** try: sls = jc.GetActivityStatus(jc_ids.keys()) ! except Exception, error: nice_error = nice_exception(error) print 'cannot get status from %s (%s)' % (jc_name, nice_error) ! if str(error).find('Connection refused') > -1 \ ! or str(error).find('timed out') > -1 \ ! or str(error).find('sorry, no service is running here') > -1: ! self.blacklist[jcid] = self.blacklist_initial_value ! print 'jc (%s) is blacklisted' % jc_name for id in jcs[jcid]: id['status_long'] = None --- 229,241 ---- try: sls = jc.GetActivityStatus(jc_ids.keys()) ! except (socket.error, UnknownServiceError), error: nice_error = nice_exception(error) print 'cannot get status from %s (%s)' % (jc_name, nice_error) ! self.dm.put_blacklist(jcid) ! print 'jc (%s) is blacklisted' % jc_name ! continue ! except Exception, error: ! nice_error = nice_exception(error) ! print 'cannot get status from %s (%s)' % (jc_name, nice_error) for id in jcs[jcid]: id['status_long'] = None *************** *** 284,288 **** schedids.commit(id['grid_id']) return True ! def get_new_jobs(self, schedids): ids = schedids.select({ 'state' : BES_OVERALL_STATE_NEW }) --- 284,289 ---- schedids.commit(id['grid_id']) return True ! ! #TODO: delete - unused method def get_new_jobs(self, schedids): ids = schedids.select({ 'state' : BES_OVERALL_STATE_NEW }) *************** *** 300,312 **** return False return ids ! def get_valid_resources(self): vr = {} ! for res in self.resource_list.keys(): ! if self.resource_list[res][1].has_key('FreeCpuNum'): ! if (int(self.resource_list[res][1]['FreeCpuNum']) > 0) and \ ! (self.blacklist.get(res, 0) == 0): vr[res] = [] ! vr[res].append(self.resource_list[res][1]) return vr --- 301,314 ---- return False return ids ! ! #TODO: delete - unused method def get_valid_resources(self): vr = {} ! for res in self.dm.resource_list.keys(): ! if self.dm.resource_list[res][1].has_key('FreeCpuNum'): ! if (int(self.dm.resource_list[res][1]['FreeCpuNum']) > 0) and \ ! (self.dm.blacklist.get(res, 0) == 0): vr[res] = [] ! vr[res].append(self.dm.resource_list[res][1]) return vr |