[Gug-cvs] gug/gug/service/cm cm.conf.default, 1.9, 1.10 cm.py, 1.41, 1.42
Status: Planning
Brought to you by:
szferi
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:11
|
Update of /cvsroot/gug/gug/gug/service/cm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/cm Modified Files: cm.conf.default cm.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: cm.conf.default =================================================================== RCS file: /cvsroot/gug/gug/gug/service/cm/cm.conf.default,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** cm.conf.default 22 Apr 2007 21:27:00 -0000 1.9 --- cm.conf.default 27 Sep 2007 10:36:06 -0000 1.10 *************** *** 5,8 **** --- 5,12 ---- </CKPT> <SchedulerPeriod>20</SchedulerPeriod> + <DecisionMaker> + <ResourceInterface>gug.module.dm.resource.computing.ClusterManager</ResourceInterface> + <Algorithm>gug.module.dm.algorithm.simple.RandomOrder</Algorithm> + </DecisionMaker> <TimeOut>600</TimeOut> <DBConnect>cm_data</DBConnect> Index: cm.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/cm/cm.py,v retrieving revision 1.41 retrieving revision 1.42 diff -C2 -d -r1.41 -r1.42 *** cm.py 13 May 2007 22:13:56 -0000 1.41 --- cm.py 27 Sep 2007 10:36:06 -0000 1.42 *************** *** 16,20 **** --- 16,22 ---- from gug.common.simplexml import SimpleXML, SimpleXMLError import urlparse + import socket from gug.common.jsdl_parser import * + from gug.module.dm.dm import DecisionMaker *************** *** 25,29 **** """ ! def __init__(self, initdata): --- 27,33 ---- """ ! ! cm_run = False ! def __init__(self, initdata): *************** *** 45,48 **** --- 49,57 ---- self.os = self.config.get_content(path2)[0].strip() storage_path = self.config.get_content('/ServiceConfig/DBConnect')[0] + #Initializing Decision Maker + dm_cfg = self.config.get_dictionaries('/ServiceConfig/DecisionMaker')[0] + dm_cfg.update({'Architecture' : self.arch, 'OperatingSystem' : self.os, 'TimeOut' : self.timeout}) + self.dm = DecisionMaker(self.gis, dm_cfg) + self.cmids = IDStore(storage_path) *************** *** 51,55 **** self.ownedcpunum = 0 self.runningjob = 0 ! self.cpucount = {} self.lastexeceslog = '' --- 60,64 ---- self.ownedcpunum = 0 self.runningjob = 0 ! #self.cpucount = {} self.lastexeceslog = '' *************** *** 59,63 **** create_provision_task(initdata, self._get_description, message_for_logging = 'CM') ! def _check(self, job): if time.time() - job['last_check_time'] >= self.timeout: --- 68,72 ---- create_provision_task(initdata, self._get_description, message_for_logging = 'CM') ! #TODO: delete this, it is unused now def _check(self, job): if time.time() - job['last_check_time'] >= self.timeout: *************** *** 65,68 **** --- 74,78 ---- job['status'] = BES_OVERALL_STATE_NEW + #TODO: delete this, it is unused now def _timeout_test(self): """ *************** *** 130,134 **** raise print 'Timeout test completted!' ! def _candidate_generator(self, good_proxies, job_id): print 'looking for candidate for job', job_id --- 140,205 ---- raise print 'Timeout test completted!' ! ! def _sched_status(self): ! """ ! This procedure will check the running jobs status, if there is error ! with the job, then the resource will be blacklisted ! """ ! ids = self.cmids.select({'status' : [BES_OVERALL_STATE_RUNNING, BES_OVERALL_STATE_UNKNOWN]}) ! #ids_valid = [ id for id in ids if id.get('exec_job_id', None) ] ! execes = {} ! error = False ! for job in ids: ! exec_id = job['exec_id'] ! if not execes.has_key(exec_id): ! execes[exec_id] = [] ! execes[exec_id].append(job) ! ! for exec_id in execes.keys(): ! error = False ! try: ! if self.dm.blacklisted(exec_id): ! raise Exception, 'exec (%s) on blacklist (%s)' % (exec_id, self.dm.blacklist[exec_id]) ! exec_proxy = self.execes[exec_id] ! statuses = exec_proxy.status([id['exec_job_id'] for id in execes[exec_id]]) ! for status in statuses: ! id = execes[exec_id][statuses.index(status)] ! id['last_check_time'] = time.time() ! id['mem_usage'] = status['mem_usage'] ! id['cpu_usage'] = status['cpu_usage'] ! id['exit_code'] = status['exit_code'] ! new_status =status['status'] ! if id['status'] != new_status: ! log.msg('job\'s status changed from %s to %s: %s' % \ ! (overall_states[id['status']], overall_states[new_status], id['grid_id'])) ! id['status'] = new_status ! if status == BES_OVERALL_STATE_TERMINATED: ! try: ! id['error_message'] = \ ! file(os.path.join(id['desc']['initialdir'],id['desc']['error'])).read() ! except IOError, error: #There is no error file ! pass ! except: ! log.error() ! log.info('Job (%s) FAILED on: %s' % (id['grid_id'], exec_id)) ! self.cmids.commit(id['grid_id']) ! except KeyError: ! print 'error while getting statuses from (%s): Exec not found!' % exec_id ! error = True ! except (socket.error, UnknownServiceError), e: ! print 'error while getting statuses from (%s): Connection problem (%s)' % (exec_id, e) ! print 'exec (%s) is blacklisted!' % exec_id ! self.dm.put_blacklist(exec_id) ! error = True ! except Exception, e: ! print 'error while getting statuses from (%s) :' % exec_id, e ! error = True ! ! if error: ! for id in execes[exec_id]: ! id['status'] = BES_OVERALL_STATE_UNKNOWN ! self.cmids.commit(id['grid_id']) ! ! #TODO: delete, not used def _candidate_generator(self, good_proxies, job_id): print 'looking for candidate for job', job_id *************** *** 199,292 **** _sched() """ ! ! try: ! self._sched_delete() self._get_exec_proxies() ! self._timeout_test() ! ! good_proxies = self._choice_proxy() ! if len(good_proxies) == 0: ! return ! ! for item in self.cmids.select({'status' : BES_OVERALL_STATE_NEW}): ! result = False ! candidates = self._candidate_generator(good_proxies, item['grid_id']) ! print 'list of candidates for job %s:' % item['grid_id'], candidates ! if len(candidates) == 0: ! continue ! while len(candidates) > 0: ! random.shuffle(candidates) ! exec_id = candidates.pop() ! if item['desc'].has_key('candidatehosts'): ! if urlparse.urlparse(self.execes[exec_id]._url)[1].split(':')[0] \ ! not in item['desc']['candidatehosts']: ! print 'This host is not in the CandidateHosts list: %s' % self.execes[exec_id]._url ! continue ! try: ! if item['exec_job_id'] != False and item['desc']['batchjob'] == False and item['desc']['checkpoint'] == '1': ! try: ! item['status'] = BES_OVERALL_STATE_MIGRATION ! result = good_proxies[exec_id]['proxy'].revoke_repair(item['exec_job_id'], item['desc']) ! if result == True: ! item['status'] = BES_OVERALL_STATE_RUNNING ! item['cpu_usage'] = 'Unknown' ! item['mem_usage'] = 'Unknown' ! result = item['exec_job_id'] ! if item['exec_id'] != exec_id: ! log.msg('The %s job MIGRATED from [Execid: %s] to [Execid: %s]' % (item['grid_id'], item['exec_id'], exec_id)) ! log.msg('Job REVOKED: %s'% item['grid_id']) ! except JobNotSuspended: ! item['status'] = BES_OVERALL_STATE_RUNNING ! item['cpu_usage'] = 'Unknown' ! log.msg('Job REVOKED: %s'% item['grid_id']) ! item['mem_usage'] = 'Unknown' ! result = item['exec_job_id'] ! log.msg('The %s Exec service is comes back, the % job is already running!' % (item['exec_id'], item['grid_id'])) ! except RevokeError, error: ! log.msg('I cannot repair this job: %s, reason: %s' % (item['grid_id'], error)) ! item['status'] = BES_OVERALL_STATE_NEW ! except: ! log.error() ! item['status'] = BES_OVERALL_STATE_NEW ! if result == False: ! result = good_proxies[exec_id]['proxy'].start(item['desc']) ! if result != False: ! good_proxies[exec_id]['free_job_place'] -= 1 ! if good_proxies[exec_id]['free_job_place'] <= 0: ! del good_proxies[exec_id] ! print 'no more free_job_place on exec', exec_id ! break ! except AttributeError, error: ! log.error("One or more attribute failures from the JSDL: %s", error) ! item['status'] = BES_OVERALL_STATE_TERMINATED ! log.error('Job TERMINATED: %s' % item['grid_id']) ! except NotAcceptingNewActivities: ! result = False ! except InitialDirectoryDoesNotExist, error: ! log.error("The job's (%s) initialdirectory does not exist: %s" % (item['grid_id'],error)) ! item['error_message'] = 'The job\'s initialdir does not exist: %s' % error ! item['status'] = BES_OVERALL_STATE_TERMINATED ! log.error('Job FAILED: %s' % item['grid_id']) ! except: ! log.error() ! ! if result == False: ! print 'cannot submit job', item['grid_id'] ! else: ! item['exec_job_id'] = result ! item['desc']['job_migration'] = item['desc']['job_migration'] + 1 ! item['exec_id'] = exec_id ! item['exec_submit_time'] = time.time() ! item['status'] = BES_OVERALL_STATE_RUNNING ! log.msg('Job %s submitted to %s (job ID on Exec is %s).' \ ! % (item['grid_id'], item['exec_id'], item['exec_job_id'])) ! self.cmids.commit(item['grid_id']) except Exception, error: ! log.error() def _choice_proxy(self): """ --- 270,464 ---- _sched() """ ! if self.cm_run is True: ! print 'ClusterManager._sched is still running...' ! else: ! self.cm_run = True ! print 'ClusterManager._sched is starting...' ! self.dm.query_resources() self._get_exec_proxies() ! self._sched_status() ! ! map = self.dm.make_decisions(self.cmids) ! while len(map) > 0: ! try: ! for job_id in map.keys(): ! res = map[job_id].next() ! if self._sched_submit(self.cmids.get(job_id), res): ! del map[job_id] ! except: ! del map[job_id] ! self._sched_delete() ! print 'ClusterManager._sched is done.' ! self.cm_run = False ! #TODO: rewrite this method like in shed._sched_sbumit to achieving batch submit ! def _sched_submit(self, item, exec_id): ! result = False ! try: ! if self.dm.blacklisted(exec_id): ! raise Exception, 'Exec (%s) is on blacklist (%s), cannot submit job:' % \ ! (exec_id, self.dm.blacklist[exec_id]), item['grid_id'] ! if item['exec_job_id'] != False and item['desc']['batchjob'] == False and item['desc']['checkpoint'] == '1': ! try: ! item['status'] = BES_OVERALL_STATE_MIGRATION ! result = self.execes[exec_id].revoke_repair(item['exec_job_id'], item['desc']) ! if result == True: ! item['status'] = BES_OVERALL_STATE_RUNNING ! item['cpu_usage'] = 'Unknown' ! item['mem_usage'] = 'Unknown' ! result = item['exec_job_id'] ! if item['exec_id'] != exec_id: ! log.msg('The %s job MIGRATED from [Execid: %s] to [Execid: %s]' % (item['grid_id'], item['exec_id'], exec_id)) ! log.msg('Job REVOKED: %s'% item['grid_id']) ! except JobNotSuspended: ! item['status'] = BES_OVERALL_STATE_RUNNING ! item['cpu_usage'] = 'Unknown' ! log.msg('Job REVOKED: %s'% item['grid_id']) ! item['mem_usage'] = 'Unknown' ! result = item['exec_job_id'] ! log.msg('The %s Exec service is comes back, the % job is already running!' % (item['exec_id'], item['grid_id'])) ! except RevokeError, error: ! log.msg('I cannot repair this job: %s, reason: %s' % (item['grid_id'], error)) ! item['status'] = BES_OVERALL_STATE_NEW ! except: ! log.error() ! item['status'] = BES_OVERALL_STATE_NEW ! if result == False: ! result = self.execes[exec_id].start(item['desc']) ! except AttributeError, error: ! log.error("One or more attribute failures from the JSDL: %s", error) ! item['status'] = BES_OVERALL_STATE_TERMINATED ! log.error('Job TERMINATED: %s' % item['grid_id']) ! except NotAcceptingNewActivities, e: ! print 'Exec (%s) is rejected the job (%s)' % exec_id, nice_exception(e) ! self.dm.resource_list[exec_id][1]['FreeJobPlace'] -= 1 ! result = False ! except (socket.error, UnknownServiceError), error: ! print 'exec (%s) connection error:' % exec_id, nice_exception(error) ! self.dm.put_blacklist(exec_id) ! print 'exec (%s) is blacklisted' % exec_id ! except InitialDirectoryDoesNotExist, error: ! log.error("The job's (%s) initialdirectory does not exist: %s" % (item['grid_id'],error)) ! item['error_message'] = 'The job\'s initialdir does not exist: %s' % error ! item['status'] = BES_OVERALL_STATE_TERMINATED ! log.error('Job FAILED: %s' % item['grid_id']) except Exception, error: ! print 'cannot submit job (%s) : ' % item['grid_id'], nice_exception(error) ! ! if result == False: ! print 'cannot submit job', item['grid_id'] ! else: ! item['exec_job_id'] = result ! item['desc']['job_migration'] = item['desc']['job_migration'] + 1 ! item['exec_id'] = exec_id ! item['exec_submit_time'] = time.time() ! item['status'] = BES_OVERALL_STATE_RUNNING ! log.msg('Job %s submitted to %s (job ID on Exec is %s).' \ ! % (item['grid_id'], item['exec_id'], item['exec_job_id'])) ! self.cmids.commit(item['grid_id']) ! return result ! ! # def _sched(self): ! # """ ! # This procedure will schedule the jobs. It will ask for working proxies. ! # ! # _sched() ! # """ ! # ! # try: ! # self._sched_status() ! # self._sched_submit() ! # self._sched_delete() ! # self._get_exec_proxies() ! # self._timeout_test() ! # ! # good_proxies = self._choice_proxy() ! # if len(good_proxies) == 0: ! # return ! # ! # for item in self.cmids.select({'status' : BES_OVERALL_STATE_NEW}): ! # result = False ! # candidates = self._candidate_generator(good_proxies, item['grid_id']) ! # print 'list of candidates for job %s:' % item['grid_id'], candidates ! # ! # if len(candidates) == 0: ! # continue ! # ! # while len(candidates) > 0: ! # random.shuffle(candidates) ! # exec_id = candidates.pop() ! # if item['desc'].has_key('candidatehosts'): ! # if urlparse.urlparse(self.execes[exec_id]._url)[1].split(':')[0] \ ! # not in item['desc']['candidatehosts']: ! # print 'This host is not in the CandidateHosts list: %s' % self.execes[exec_id]._url ! # continue ! # try: ! # if item['exec_job_id'] != False and item['desc']['batchjob'] == False and item['desc']['checkpoint'] == '1': ! # try: ! # item['status'] = BES_OVERALL_STATE_MIGRATION ! # result = good_proxies[exec_id]['proxy'].revoke_repair(item['exec_job_id'], item['desc']) ! # if result == True: ! # item['status'] = BES_OVERALL_STATE_RUNNING ! # item['cpu_usage'] = 'Unknown' ! # item['mem_usage'] = 'Unknown' ! # result = item['exec_job_id'] ! # if item['exec_id'] != exec_id: ! # log.msg('The %s job MIGRATED from [Execid: %s] to [Execid: %s]' % (item['grid_id'], item['exec_id'], exec_id)) ! # log.msg('Job REVOKED: %s'% item['grid_id']) ! # except JobNotSuspended: ! # item['status'] = BES_OVERALL_STATE_RUNNING ! # item['cpu_usage'] = 'Unknown' ! # log.msg('Job REVOKED: %s'% item['grid_id']) ! # item['mem_usage'] = 'Unknown' ! # result = item['exec_job_id'] ! # log.msg('The %s Exec service is comes back, the % job is already running!' % (item['exec_id'], item['grid_id'])) ! # except RevokeError, error: ! # log.msg('I cannot repair this job: %s, reason: %s' % (item['grid_id'], error)) ! # item['status'] = BES_OVERALL_STATE_NEW ! # except: ! # log.error() ! # item['status'] = BES_OVERALL_STATE_NEW ! # ! # if result == False: ! # result = good_proxies[exec_id]['proxy'].start(item['desc']) ! # ! # if result != False: ! # good_proxies[exec_id]['free_job_place'] -= 1 ! # if good_proxies[exec_id]['free_job_place'] <= 0: ! # del good_proxies[exec_id] ! # print 'no more free_job_place on exec', exec_id ! # break ! # except AttributeError, error: ! # log.error("One or more attribute failures from the JSDL: %s", error) ! # item['status'] = BES_OVERALL_STATE_TERMINATED ! # log.error('Job TERMINATED: %s' % item['grid_id']) ! # except NotAcceptingNewActivities: ! # result = False ! # except InitialDirectoryDoesNotExist, error: ! # log.error("The job's (%s) initialdirectory does not exist: %s" % (item['grid_id'],error)) ! # item['error_message'] = 'The job\'s initialdir does not exist: %s' % error ! # item['status'] = BES_OVERALL_STATE_TERMINATED ! # log.error('Job FAILED: %s' % item['grid_id']) ! # except: ! # log.error() ! # ! # if result == False: ! # print 'cannot submit job', item['grid_id'] ! # else: ! # item['exec_job_id'] = result ! # item['desc']['job_migration'] = item['desc']['job_migration'] + 1 ! # item['exec_id'] = exec_id ! # item['exec_submit_time'] = time.time() ! # item['status'] = BES_OVERALL_STATE_RUNNING ! # log.msg('Job %s submitted to %s (job ID on Exec is %s).' \ ! # % (item['grid_id'], item['exec_id'], item['exec_job_id'])) ! # self.cmids.commit(item['grid_id']) ! # except Exception, error: ! # log.error() + #TODO: delete this, it is unused now def _choice_proxy(self): """ *************** *** 330,333 **** --- 502,506 ---- 'cpu_usage' : '-1', 'mem_usage':'-1', 'exit_code': None}) log.msg("New job: %s " % pid) + #print self.cmids.get(pid) return pid except UnknownActivityType, e: *************** *** 432,435 **** --- 605,611 ---- log.msg('Job deleted: %s' % job['grid_id']) continue + if self.dm.blacklisted(job['exec_id']): + print 'Exec (%s) is blacklisted (%s), cannot delete job' % \ + (job['exec_id'], self.dm.blacklist[job['exec_id ']]), job['grid_id'] retries = 3 while retries > 0: *************** *** 516,539 **** cpunum = 0 freecpunum = 0 ! for item in self.gis.get({'type' : 'Exec'}): try: ! data = SimpleXML(memory = item[1]) ! if data.get_content('/ServiceDescription/ClusterElement/CPUArchitecture')[0].strip() \ ! != self.arch and \ ! data.get_content('/ServiceDescription/ClusterElement/OperatingSystem')[0].strip() \ ! != self.os: ! continue ! try: ! client = get_working_client(item[0]['urls']) ! except: ! continue ! execes[item[0]['sid']] = client ! data = SimpleXML(memory = item[1]) ! cpucount = int(data.get_content('/ServiceDescription/ClusterElement/CPUCount')[0]) ! cpunum = cpunum + cpucount ! self.cpucount[item[0]['sid']] = cpucount ! freecpunum = freecpunum + int(data.get_content('/ServiceDescription/ClusterElement/FreeCpuNum')[0]) except Exception, e: ! print 'error on parsing Exec provision data', nice_exception(e) execeslog = ', '.join([e._url for e in self.execes.values()]) if execeslog != self.lastexeceslog: --- 692,706 ---- cpunum = 0 freecpunum = 0 ! for item in self.dm.resource_list.values(): try: ! client = get_client(item[0]['urls'][0]) except Exception, e: ! print 'Cannot reach exec (%s):' % item[0]['sid'], nice_exception(e) ! continue ! execes[item[0]['sid']] = client ! data = item[1] ! cpucount = int(data['CPUCount']) ! cpunum = cpunum + cpucount ! freecpunum = freecpunum + int(data['FreeCpuNum']) execeslog = ', '.join([e._url for e in self.execes.values()]) if execeslog != self.lastexeceslog: *************** *** 550,563 **** try: out = create_desc_from_jsdl(jsdl_str) ! print out #out['walltimelimit'] = 1209600 # 2 weeks out['ckpt_period'] = self.CKPTPeriod out['directory_structure'] = directory_structure out['job_migration'] = 0 ! print out except Exception, error: log.msg('Unfatal exception until parse jsdl: %s' % str(error)) ! print 'submit description:', out return out --- 717,731 ---- try: out = create_desc_from_jsdl(jsdl_str) ! #print out #out['walltimelimit'] = 1209600 # 2 weeks out['ckpt_period'] = self.CKPTPeriod out['directory_structure'] = directory_structure out['job_migration'] = 0 ! #print out except Exception, error: log.msg('Unfatal exception until parse jsdl: %s' % str(error)) + raise ! #print 'submit description:', out return out *************** *** 659,663 **** out[item]['completed'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_COMPLETED, 'exec_id' : item}))) out[item]['jobnum'] = str(len(self.cmids.select({'exec_id' : item}))) ! out[item]['cpunum'] = self.cpucount[item] out[item]['new'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_NEW, 'exec_id' : item}))) out[item]['terminated'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_TERMINATED, 'exec_id' : item}))) --- 827,831 ---- out[item]['completed'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_COMPLETED, 'exec_id' : item}))) out[item]['jobnum'] = str(len(self.cmids.select({'exec_id' : item}))) ! out[item]['cpunum'] = self.dm.resource_list[item][1]['CPUCount'] out[item]['new'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_NEW, 'exec_id' : item}))) out[item]['terminated'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_TERMINATED, 'exec_id' : item}))) |