[Gug-cvs] gug/gug/service/sched sched.py, 1.42, 1.43 sched.conf.default, 1.6, 1.7
Status: Planning
Brought to you by:
szferi
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:35
|
Update of /cvsroot/gug/gug/gug/service/sched In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/sched Modified Files: sched.py sched.conf.default Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: sched.conf.default =================================================================== RCS file: /cvsroot/gug/gug/gug/service/sched/sched.conf.default,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** sched.conf.default 3 Apr 2007 16:51:50 -0000 1.6 --- sched.conf.default 27 Sep 2007 10:36:04 -0000 1.7 *************** *** 2,6 **** <ServiceConfig> <DecisionMaker> ! <Class>gug.service.sched.decision_maker.simple.DMSimpleRandom</Class> </DecisionMaker> <ResourceInterface> --- 2,7 ---- <ServiceConfig> <DecisionMaker> ! <ResourceInterface>gug.module.dm.resource.computing.SuperScheduler</ResourceInterface> ! <Algorithm>gug.module.dm.algorithm.simple.RandomOrder</Algorithm> </DecisionMaker> <ResourceInterface> Index: sched.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/sched/sched.py,v retrieving revision 1.42 retrieving revision 1.43 diff -C2 -d -r1.42 -r1.43 *** sched.py 7 May 2007 13:26:53 -0000 1.42 --- sched.py 27 Sep 2007 10:36:04 -0000 1.43 *************** *** 1,311 **** ! """ Super Scheduler Service """ ! __revision__ = '$Revision$' ! ! from gug.common.config import Config, as_boolean ! from gug.common.share import import_class_from_string, mkuid, \ ! random_name, nice_exception, gis_proxy, create_provision_task ! from gug.common import log ! from gug.client.storage import Storage ! from gug.common.ogsa_bes import * ! from gug.common.compiler_states import * ! from gug.common.exception import * ! from gug.common.jsdl import * ! from gug.common.simplexml import SimpleXML, SimpleXMLError ! from gug.host.timed import Task ! from gug.client.soap import get_client ! from gug.XFile import XFile, LOCK_EX, LOCK_SH ! from gug.common.idstore import IDStore ! from gug.common.jsdl_parser import * ! ! import os ! import time ! import shutil ! import socket ! import base64 ! ! JC_ID_UNKNOWN = 'Unknown' ! ANONYMOUS = 'anonymous' ! ROADWARRIOR = '/CN=Roadwarrior/emailAddress=sz...@ni...' ! ! class SuperScheduler: ! sched_run = False ! ! def __init__(self, initdata): ! self.gis_proxy = gis_proxy(initdata['ctl']) ! self.storage = Storage(gis_proxy = self.gis_proxy) ! self.id = initdata['sid'] ! self.config = Config(initdata['ConfigFile']) ! path = '/ServiceConfig/AllowAnonymous' ! self.allow_anonymous = as_boolean(self.config.get_content(path, '0')[0]) ! ! self.storage_path = self.config.get_content('/ServiceConfig/DBConnect')[0] ! self.schedids = IDStore(self.storage_path) ! ! # Init descision maker backend ! path = '/ServiceConfig/DecisionMaker' ! dm_cfg = self.config.get_dictionaries(path)[0] ! dm_class = import_class_from_string(dm_cfg['Class']) ! self.decision_maker = dm_class(self.gis_proxy, dm_cfg) ! ! # Init resource interface backend ! path = '/ServiceConfig/ResourceInterface' ! ri_cfg = self.config.get_dictionaries(path)[0] ! ri_class = import_class_from_string(ri_cfg['Class']) ! self.resource = ri_class(self.gis_proxy, ri_cfg) ! ! # Init scheduler task ! path = '/ServiceConfig/SchedulerPeriod' ! sched_period = self.config.get_content(path) ! Task(self.id, self._sched).start(int(sched_period[0])) ! ! path = '/ServiceConfig/SuperUserDN' ! self.sudns = self.config.get_content(path,[]) ! if not self.sudns: ! log.error('No SuperUserDN found.') ! ! path = '/ServiceConfig/MonitorUserDN' ! self.monitordns = self.config.get_content(path,[]) ! if self.monitordns: ! log.msg('MonitorUserDN found:', self.monitordns) ! ! self._set_service_name() ! create_provision_task(initdata, self._get_description, message_for_logging = 'Sched') ! ! ! ! def _set_service_name(self): ! self.longname = "Super Scheduler Service" ! ! def _sched_delete(self): ! # Select all job wich is in DELETED state ! ids = self.schedids.select({'state' : BES_OVERALL_STATE_KILLED }) ! if len(ids) == 0: ! print 'there is no deleted job' ! return False ! self.resource.remove(ids, self.schedids) ! ! def _sched_status(self): ! idsel = self.schedids.select({}) ! ids = [ id for id in idsel if id['state'] not in \ ! [ BES_OVERALL_STATE_DONE, BES_OVERALL_STATE_KILLED ] ] ! if len(ids) == 0: ! print 'there is no running job' ! return False ! ! self.resource.status(ids, self.schedids) ! ! def _sched_submit(self): ! ids = self.resource.get_new_jobs(self.schedids) ! if ids: ! rlist = self.resource.get_valid_resources() ! for resource_id, ids in self.decision_maker.decision(ids, rlist).iteritems(): ! try: ! self.resource.submit(resource_id, ids, self.schedids) ! except Exception, error: ! log.error('Cannot submit to', resource_id, nice_exception(error)) ! continue ! ! def _sched(self): ! """ Check lock for scheduler task and run real function """ ! ! if self.sched_run is True: ! print 'SuperScheduler._sched is still running...' ! else: ! self.sched_run = True ! print 'SuperScheduler._sched is starting...' ! self.resource.get_resources() ! if len(self.resource.resource_list) > 0: ! self._sched_submit() ! self._sched_status() ! self._sched_delete() ! print 'SuperScheduler._sched is done.' ! self.sched_run = False ! ! def _check_auth(self, auth): ! print auth ! if not self.allow_anonymous: ! #if auth == None: ! # raise UserNotAllowedException, 'no auth' ! #subject = auth.get_subject() ! #if subject == None: ! # raise UserNotAllowedException, 'no subject' ! #if subject.CN == None: ! # raise UserNotAllowedException, 'no CN' ! #if subject.CN == '': ! # raise UserNotAllowedException, 'empty CN' ! #if subject.CN == 'Roadwarrior': ! # raise UserNotAllowedException, 'Roadwarrior not allowed' ! if not auth['http'] and not auth['x509']: ! raise UserNotAllowedException, 'not authenticated' ! try: ! if auth['x509']: ! owner = str(auth['x509'].subject()) ! elif auth['http']: ! type, data = auth['http'].split(' ',1) ! if type == 'Basic': ! user, pw = base64.decodestring(data).split(':') ! owner = user ! else: ! raise UserNotAllowedException, 'not supported authentication method' ! else: ! owner = ANONYMOUS ! except UserNotAllowedException: ! raise ! except: ! log.error() ! owner = ANONYMOUS ! return owner ! ! def _own(self, owner, id): ! if not id.has_key('owner'): ! return False ! if id['owner'] in [None, ANONYMOUS, ROADWARRIOR] and \ ! owner in [None, ANONYMOUS, ROADWARRIOR]: ! return True ! if id['owner'] == None: ! return False ! return owner == id['owner'] ! ! def _cleanup(self): ! log.msg('Cleanup of Super Scheduler.') ! ! def _create_activity(self, grid_id, owner, jsdl_doc): ! try: ! stat, jobtype = get_job_type(jsdl_doc) ! ! if (not stat) or ( jobtype.upper() not in applicationtype_list): ! log.error("Error: %s type job sent to Super Scheduler Service" % jobtype) ! return False ! ! xml = SimpleXML(memory = jsdl_doc) ! nss = {'jsdl' : "http://schemas.ggf.org/jsdl/2005/11/jsdl"} ! jobname = xml.get_content('//jsdl:JobName', nss = nss)[0] ! id = { ! 'grid_id' : grid_id, ! 'jsdl' : jsdl_doc, ! 'state' : BES_OVERALL_STATE_NEW, ! 'owner' : owner, ! 'jobname' : jobname, ! 'sched_submit_time' : time.time(), ! 'status_long' : None, ! 'jobtype' : jobtype, ! 'run_arch' : None, ! 'run_os' : None } ! self.schedids.add(id) ! except Exception, e: ! log.error() ! ! return True ! ! # Other name of Submit job ! def CreateActivityFromJSDL(self, auth, jsdl_doc): ! owner = self._check_auth(auth) ! grid_id = self.id + '/' + mkuid() ! log.msg('New job from %s:' % owner, grid_id) ! if self._create_activity(grid_id, owner, jsdl_doc): ! return grid_id ! else: ! return None ! ! def _make_ActivityJSDLDocument(self, grid_id, jsdldoc): ! x = SimpleXML(memory = jsdldoc) ! x.get_node('/')[0].childNodes[0].tagName = 'ActivityJSDLDocument' ! x.insert_child('ActivityIdentifier',grid_id) ! s = str(x) ! del x ! return s ! ! def GetActivityJSDLDocuments(self, auth, grid_ids): ! owner = self._check_auth(auth) ! if len(grid_ids) == 0: ! # Get all job ! ids = self.schedids.select({}) ! else: ! ids = self.schedids.select_by_id_list(grid_ids) ! if owner not in self.sudns: ! # Filter user's jobs ! ids = [id for id in ids if self._own(owner,id)] ! return [self._make_ActivityJSDLDocument(id['grid_id'],id['jsdl']) \ ! for id in ids] ! ! def GetActivityStatus(self, auth, grid_ids): ! if isinstance(grid_ids, str): ! grid_ids = [grid_ids] ! owner = self._check_auth(auth) ! if len(grid_ids) == 0: ! # Get all job ! selects = self.schedids.select({}) ! else: ! selects = self.schedids.select_by_id_list(grid_ids) ! ! if owner not in self.sudns and owner not in self.monitordns: ! # Filter user's jobs ! selects = [id for id in selects if self._own(owner,id)] ! return [ self.resource.get_status(id) for id in selects ] ! ! def _stage_status_to_xml(self,inout,stages): ! return ''.join(['<Stage%sStatus id="%s" state="%s"/>' \ ! % (inout, name, data['state']) \ ! for name, data in stages.items()]) ! ! def _state_change(self, owner, state_change_request): ! state_change_response = [] ! if len(state_change_request) == 0: ! pass ! else: ! for request in state_change_request: ! #print 'state change request:', request ! if request[0] == '': ! selects = self.schedids.select({'owner' : owner}) ! for s in selects: ! s['state'] = int(request[1]['to']) ! state_change_response.append(\ ! (s['grid_id'], 'Succeeded')) ! s['status_long'] = '' ! self.schedids.commit(s['grid_id']) ! else: ! try: ! id = self.schedids.get(request[0]) ! if owner not in self.sudns: ! if not self._own(owner,id): ! state_change_response.append((request[0],'Not allowed')) ! continue ! id['state'] = int(request[1]['to']) ! state_change_response.append((request[0],'Succeeded')) ! id['status_long'] = '' ! self.schedids.commit(id['grid_id']) ! except: ! state_change_response.append((request[0],'Not found')) ! return state_change_response ! ! def RequestActivityStateChanges(self, auth, requests): ! owner = self._check_auth(auth) ! state_change_request = [] ! for request in requests: ! req_xml = SimpleXML(memory=request) ! state_change_request.append( \ ! (req_xml.get_content('//ActivityIdentifier',[''])[0], \ ! req_xml.get_attributes('//RequestedStateChange')[0])) ! #print 'state changes:', state_change_request ! return [\ ! ('<StateChangeResponse>' + \ ! '<ActivityIdentifier>%s</ActivityIdentifier>' + \ ! '<Response>%s</Response>' + \ ! '</StateChangeResponse>') \ ! % (id, resp) for (id, resp) \ ! in self._state_change (owner, state_change_request)] ! ! def _get_description(self, site_id): ! return """<?xml version='1.0'?> ! <ServiceDescription> ! <LongName>%s</LongName> ! <MajorVersion>0</MajorVersion> ! <MinorVersion>1</MinorVersion> ! <PatchVersion>0</PatchVersion> ! <Site>%s</Site> ! <WSDL></WSDL> ! <Semantics> ! Provide OGSA BES compatiblie interface to grid level ! job management ! </Semantics> ! </ServiceDescription>""" % (self.longname, site_id) --- 1,329 ---- ! """ Super Scheduler Service """ ! __revision__ = '$Revision$' ! ! from gug.common.config import Config, as_boolean ! from gug.common.share import import_class_from_string, mkuid, \ ! random_name, nice_exception, gis_proxy, create_provision_task ! from gug.common import log ! from gug.client.storage import Storage ! from gug.common.ogsa_bes import * ! from gug.common.compiler_states import * ! from gug.common.exception import * ! from gug.common.jsdl import * ! from gug.common.simplexml import SimpleXML, SimpleXMLError ! from gug.host.timed import Task ! from gug.client.soap import get_client ! from gug.XFile import XFile, LOCK_EX, LOCK_SH ! from gug.common.idstore import IDStore ! from gug.common.jsdl_parser import * ! from gug.module.dm.dm import DecisionMaker ! ! import os ! import time ! import shutil ! import socket ! import base64 ! ! JC_ID_UNKNOWN = 'Unknown' ! ANONYMOUS = 'anonymous' ! ROADWARRIOR = '/CN=Roadwarrior/emailAddress=sz...@ni...' ! ! class SuperScheduler: ! sched_run = False ! ! def __init__(self, initdata): ! self.gis_proxy = gis_proxy(initdata['ctl']) ! self.storage = Storage(gis_proxy = self.gis_proxy) ! self.id = initdata['sid'] ! self.config = Config(initdata['ConfigFile']) ! path = '/ServiceConfig/AllowAnonymous' ! self.allow_anonymous = as_boolean(self.config.get_content(path, '0')[0]) ! ! self.storage_path = self.config.get_content('/ServiceConfig/DBConnect')[0] ! self.schedids = IDStore(self.storage_path) ! ! # Init descision maker backend ! path = '/ServiceConfig/DecisionMaker' ! dm_cfg = self.config.get_dictionaries(path)[0] ! #dm_class = import_class_from_string(dm_cfg['Class']) ! #self.decision_maker = dm_class(self.gis_proxy, dm_cfg) ! self.dm = DecisionMaker(self.gis_proxy, dm_cfg) ! ! # Init resource interface backend ! path = '/ServiceConfig/ResourceInterface' ! ri_cfg = self.config.get_dictionaries(path)[0] ! ri_class = import_class_from_string(ri_cfg['Class']) ! self.resource = ri_class(self.gis_proxy, self.dm, ri_cfg) ! ! # Init scheduler task ! path = '/ServiceConfig/SchedulerPeriod' ! sched_period = self.config.get_content(path) ! Task(self.id, self._sched).start(int(sched_period[0])) ! ! path = '/ServiceConfig/SuperUserDN' ! self.sudns = self.config.get_content(path,[]) ! if not self.sudns: ! log.error('No SuperUserDN found.') ! ! path = '/ServiceConfig/MonitorUserDN' ! self.monitordns = self.config.get_content(path,[]) ! if self.monitordns: ! log.msg('MonitorUserDN found:', self.monitordns) ! ! self._set_service_name() ! create_provision_task(initdata, self._get_description, message_for_logging = 'Sched') ! ! ! ! def _set_service_name(self): ! self.longname = "Super Scheduler Service" ! ! def _sched_delete(self): ! # Select all job wich is in DELETED state ! ids = self.schedids.select({'state' : BES_OVERALL_STATE_KILLED }) ! if len(ids) == 0: ! print 'there is no deleted job' ! return False ! self.resource.remove(ids, self.schedids) ! ! def _sched_status(self): ! idsel = self.schedids.select({}) ! ids = [ id for id in idsel if id['state'] not in \ ! [ BES_OVERALL_STATE_DONE, BES_OVERALL_STATE_KILLED ] ] ! if len(ids) == 0: ! print 'there is no running job' ! return False ! ! self.resource.status(ids, self.schedids) ! ! # def _sched_submit(self): ! # ids = self.resource.get_new_jobs(self.schedids) ! # if ids: ! # rlist = self.resource.get_valid_resources() ! # for resource_id, ids in self.decision_maker.decision(ids, rlist).iteritems(): ! # try: ! # self.resource.submit(resource_id, ids, self.schedids) ! # except Exception, error: ! # log.error('Cannot submit to', resource_id, nice_exception(error)) ! # continue ! ! def _sched_submit(self): ! map = self.dm.make_decisions(self.schedids) ! while len(map) > 0: ! res_jobs = {} ! try: ! for (job_id, candidate) in map.iteritems(): ! res = candidate.next() ! if not res_jobs.has_key(res): ! res_jobs[res] = [] ! res_jobs[res].append(self.schedids.get(job_id)) ! except: ! del map[job_id] ! for (resource_id, ids) in res_jobs.iteritems(): ! if self.resource.submit(resource_id, ids, self.schedids): ! for id in ids: ! del map[id['grid_id']] ! ! def _sched(self): ! """ Check lock for scheduler task and run real function """ ! if self.sched_run is True: ! print 'SuperScheduler._sched is still running...' ! else: ! self.sched_run = True ! print 'SuperScheduler._sched is starting...' ! self.dm.query_resources() ! self._sched_status() ! if len(self.dm.resource_list) > 0: ! self._sched_submit() ! self._sched_delete() ! print 'SuperScheduler._sched is done.' ! self.sched_run = False ! ! def _check_auth(self, auth): ! print auth ! if not self.allow_anonymous: ! #if auth == None: ! # raise UserNotAllowedException, 'no auth' ! #subject = auth.get_subject() ! #if subject == None: ! # raise UserNotAllowedException, 'no subject' ! #if subject.CN == None: ! # raise UserNotAllowedException, 'no CN' ! #if subject.CN == '': ! # raise UserNotAllowedException, 'empty CN' ! #if subject.CN == 'Roadwarrior': ! # raise UserNotAllowedException, 'Roadwarrior not allowed' ! if not auth['http'] and not auth['x509']: ! raise UserNotAllowedException, 'not authenticated' ! try: ! if auth['x509']: ! owner = str(auth['x509'].subject()) ! elif auth['http']: ! type, data = auth['http'].split(' ',1) ! if type == 'Basic': ! user, pw = base64.decodestring(data).split(':') ! owner = user ! else: ! raise UserNotAllowedException, 'not supported authentication method' ! else: ! owner = ANONYMOUS ! except UserNotAllowedException: ! raise ! except: ! log.error() ! owner = ANONYMOUS ! return owner ! ! def _own(self, owner, id): ! if not id.has_key('owner'): ! return False ! if id['owner'] in [None, ANONYMOUS, ROADWARRIOR] and \ ! owner in [None, ANONYMOUS, ROADWARRIOR]: ! return True ! if id['owner'] == None: ! return False ! return owner == id['owner'] ! ! def _cleanup(self): ! log.msg('Cleanup of Super Scheduler.') ! ! def _create_activity(self, grid_id, owner, jsdl_doc): ! try: ! stat, jobtype = get_job_type(jsdl_doc) ! ! if (not stat) or ( jobtype.upper() not in applicationtype_list): ! log.error("Error: %s type job sent to Super Scheduler Service" % jobtype) ! return False ! ! xml = SimpleXML(memory = jsdl_doc) ! nss = {'jsdl' : "http://schemas.ggf.org/jsdl/2005/11/jsdl"} ! jobname = xml.get_content('//jsdl:JobName', nss = nss)[0] ! id = { ! 'grid_id' : grid_id, ! 'jsdl' : jsdl_doc, ! 'state' : BES_OVERALL_STATE_NEW, ! 'owner' : owner, ! 'jobname' : jobname, ! 'sched_submit_time' : time.time(), ! 'status_long' : None, ! 'jobtype' : jobtype, ! 'run_arch' : None, ! 'run_os' : None } ! self.schedids.add(id) ! except Exception, e: ! log.error() ! ! return True ! ! # Other name of Submit job ! def CreateActivityFromJSDL(self, auth, jsdl_doc): ! owner = self._check_auth(auth) ! grid_id = self.id + '/' + mkuid() ! log.msg('New job from %s:' % owner, grid_id) ! if self._create_activity(grid_id, owner, jsdl_doc): ! return grid_id ! else: ! return None ! ! def _make_ActivityJSDLDocument(self, grid_id, jsdldoc): ! x = SimpleXML(memory = jsdldoc) ! x.get_node('/')[0].childNodes[0].tagName = 'ActivityJSDLDocument' ! x.insert_child('ActivityIdentifier',grid_id) ! s = str(x) ! del x ! return s ! ! def GetActivityJSDLDocuments(self, auth, grid_ids): ! owner = self._check_auth(auth) ! if len(grid_ids) == 0: ! # Get all job ! ids = self.schedids.select({}) ! else: ! ids = self.schedids.select_by_id_list(grid_ids) ! if owner not in self.sudns: ! # Filter user's jobs ! ids = [id for id in ids if self._own(owner,id)] ! return [self._make_ActivityJSDLDocument(id['grid_id'],id['jsdl']) \ ! for id in ids] ! ! def GetActivityStatus(self, auth, grid_ids): ! if isinstance(grid_ids, str): ! grid_ids = [grid_ids] ! owner = self._check_auth(auth) ! if len(grid_ids) == 0: ! # Get all job ! selects = self.schedids.select({}) ! else: ! selects = self.schedids.select_by_id_list(grid_ids) ! ! if owner not in self.sudns and owner not in self.monitordns: ! # Filter user's jobs ! selects = [id for id in selects if self._own(owner,id)] ! return [ self.resource.get_status(id) for id in selects ] ! ! def _stage_status_to_xml(self,inout,stages): ! return ''.join(['<Stage%sStatus id="%s" state="%s"/>' \ ! % (inout, name, data['state']) \ ! for name, data in stages.items()]) ! ! def _state_change(self, owner, state_change_request): ! state_change_response = [] ! if len(state_change_request) == 0: ! pass ! else: ! for request in state_change_request: ! #print 'state change request:', request ! if request[0] == '': ! selects = self.schedids.select({'owner' : owner}) ! for s in selects: ! s['state'] = int(request[1]['to']) ! state_change_response.append(\ ! (s['grid_id'], 'Succeeded')) ! s['status_long'] = '' ! self.schedids.commit(s['grid_id']) ! else: ! try: ! id = self.schedids.get(request[0]) ! if owner not in self.sudns: ! if not self._own(owner,id): ! state_change_response.append((request[0],'Not allowed')) ! continue ! id['state'] = int(request[1]['to']) ! state_change_response.append((request[0],'Succeeded')) ! id['status_long'] = '' ! self.schedids.commit(id['grid_id']) ! except: ! state_change_response.append((request[0],'Not found')) ! return state_change_response ! ! def RequestActivityStateChanges(self, auth, requests): ! owner = self._check_auth(auth) ! state_change_request = [] ! for request in requests: ! req_xml = SimpleXML(memory=request) ! state_change_request.append( \ ! (req_xml.get_content('//ActivityIdentifier',[''])[0], \ ! req_xml.get_attributes('//RequestedStateChange')[0])) ! #print 'state changes:', state_change_request ! return [\ ! ('<StateChangeResponse>' + \ ! '<ActivityIdentifier>%s</ActivityIdentifier>' + \ ! '<Response>%s</Response>' + \ ! '</StateChangeResponse>') \ ! % (id, resp) for (id, resp) \ ! in self._state_change (owner, state_change_request)] ! ! def _get_description(self, site_id): ! return """<?xml version='1.0'?> ! <ServiceDescription> ! <LongName>%s</LongName> ! <MajorVersion>0</MajorVersion> ! <MinorVersion>1</MinorVersion> ! <PatchVersion>0</PatchVersion> ! <Site>%s</Site> ! <WSDL></WSDL> ! <Semantics> ! Provide OGSA BES compatiblie interface to grid level ! job management ! </Semantics> ! </ServiceDescription>""" % (self.longname, site_id) |