gug-cvs Mailing List for Grid Underground
Status: Planning
Brought to you by:
szferi
You can subscribe to this list here.
2005 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(67) |
Oct
(232) |
Nov
(76) |
Dec
(36) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2006 |
Jan
(100) |
Feb
(89) |
Mar
(44) |
Apr
(71) |
May
(34) |
Jun
(81) |
Jul
(43) |
Aug
(42) |
Sep
(1) |
Oct
(87) |
Nov
(11) |
Dec
(9) |
2007 |
Jan
(203) |
Feb
(149) |
Mar
(94) |
Apr
(70) |
May
(35) |
Jun
(27) |
Jul
(4) |
Aug
|
Sep
(18) |
Oct
|
Nov
|
Dec
|
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:35
|
Update of /cvsroot/gug/gug/gug/service/sched In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/sched Modified Files: sched.py sched.conf.default Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: sched.conf.default =================================================================== RCS file: /cvsroot/gug/gug/gug/service/sched/sched.conf.default,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** sched.conf.default 3 Apr 2007 16:51:50 -0000 1.6 --- sched.conf.default 27 Sep 2007 10:36:04 -0000 1.7 *************** *** 2,6 **** <ServiceConfig> <DecisionMaker> ! <Class>gug.service.sched.decision_maker.simple.DMSimpleRandom</Class> </DecisionMaker> <ResourceInterface> --- 2,7 ---- <ServiceConfig> <DecisionMaker> ! <ResourceInterface>gug.module.dm.resource.computing.SuperScheduler</ResourceInterface> ! <Algorithm>gug.module.dm.algorithm.simple.RandomOrder</Algorithm> </DecisionMaker> <ResourceInterface> Index: sched.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/sched/sched.py,v retrieving revision 1.42 retrieving revision 1.43 diff -C2 -d -r1.42 -r1.43 *** sched.py 7 May 2007 13:26:53 -0000 1.42 --- sched.py 27 Sep 2007 10:36:04 -0000 1.43 *************** *** 1,311 **** ! """ Super Scheduler Service """ ! __revision__ = '$Revision$' ! ! from gug.common.config import Config, as_boolean ! from gug.common.share import import_class_from_string, mkuid, \ ! random_name, nice_exception, gis_proxy, create_provision_task ! from gug.common import log ! from gug.client.storage import Storage ! from gug.common.ogsa_bes import * ! from gug.common.compiler_states import * ! from gug.common.exception import * ! from gug.common.jsdl import * ! from gug.common.simplexml import SimpleXML, SimpleXMLError ! from gug.host.timed import Task ! from gug.client.soap import get_client ! from gug.XFile import XFile, LOCK_EX, LOCK_SH ! from gug.common.idstore import IDStore ! from gug.common.jsdl_parser import * ! ! import os ! import time ! import shutil ! import socket ! import base64 ! ! JC_ID_UNKNOWN = 'Unknown' ! ANONYMOUS = 'anonymous' ! ROADWARRIOR = '/CN=Roadwarrior/emailAddress=sz...@ni...' ! ! class SuperScheduler: ! sched_run = False ! ! def __init__(self, initdata): ! self.gis_proxy = gis_proxy(initdata['ctl']) ! self.storage = Storage(gis_proxy = self.gis_proxy) ! self.id = initdata['sid'] ! self.config = Config(initdata['ConfigFile']) ! path = '/ServiceConfig/AllowAnonymous' ! self.allow_anonymous = as_boolean(self.config.get_content(path, '0')[0]) ! ! self.storage_path = self.config.get_content('/ServiceConfig/DBConnect')[0] ! self.schedids = IDStore(self.storage_path) ! ! # Init descision maker backend ! path = '/ServiceConfig/DecisionMaker' ! dm_cfg = self.config.get_dictionaries(path)[0] ! dm_class = import_class_from_string(dm_cfg['Class']) ! self.decision_maker = dm_class(self.gis_proxy, dm_cfg) ! ! # Init resource interface backend ! path = '/ServiceConfig/ResourceInterface' ! ri_cfg = self.config.get_dictionaries(path)[0] ! ri_class = import_class_from_string(ri_cfg['Class']) ! self.resource = ri_class(self.gis_proxy, ri_cfg) ! ! # Init scheduler task ! path = '/ServiceConfig/SchedulerPeriod' ! sched_period = self.config.get_content(path) ! Task(self.id, self._sched).start(int(sched_period[0])) ! ! path = '/ServiceConfig/SuperUserDN' ! self.sudns = self.config.get_content(path,[]) ! if not self.sudns: ! log.error('No SuperUserDN found.') ! ! path = '/ServiceConfig/MonitorUserDN' ! self.monitordns = self.config.get_content(path,[]) ! if self.monitordns: ! log.msg('MonitorUserDN found:', self.monitordns) ! ! self._set_service_name() ! create_provision_task(initdata, self._get_description, message_for_logging = 'Sched') ! ! ! ! def _set_service_name(self): ! self.longname = "Super Scheduler Service" ! ! def _sched_delete(self): ! # Select all job wich is in DELETED state ! ids = self.schedids.select({'state' : BES_OVERALL_STATE_KILLED }) ! if len(ids) == 0: ! print 'there is no deleted job' ! return False ! self.resource.remove(ids, self.schedids) ! ! def _sched_status(self): ! idsel = self.schedids.select({}) ! ids = [ id for id in idsel if id['state'] not in \ ! [ BES_OVERALL_STATE_DONE, BES_OVERALL_STATE_KILLED ] ] ! if len(ids) == 0: ! print 'there is no running job' ! return False ! ! self.resource.status(ids, self.schedids) ! ! def _sched_submit(self): ! ids = self.resource.get_new_jobs(self.schedids) ! if ids: ! rlist = self.resource.get_valid_resources() ! for resource_id, ids in self.decision_maker.decision(ids, rlist).iteritems(): ! try: ! self.resource.submit(resource_id, ids, self.schedids) ! except Exception, error: ! log.error('Cannot submit to', resource_id, nice_exception(error)) ! continue ! ! def _sched(self): ! """ Check lock for scheduler task and run real function """ ! ! if self.sched_run is True: ! print 'SuperScheduler._sched is still running...' ! else: ! self.sched_run = True ! print 'SuperScheduler._sched is starting...' ! self.resource.get_resources() ! if len(self.resource.resource_list) > 0: ! self._sched_submit() ! self._sched_status() ! self._sched_delete() ! print 'SuperScheduler._sched is done.' ! self.sched_run = False ! ! def _check_auth(self, auth): ! print auth ! if not self.allow_anonymous: ! #if auth == None: ! # raise UserNotAllowedException, 'no auth' ! #subject = auth.get_subject() ! #if subject == None: ! # raise UserNotAllowedException, 'no subject' ! #if subject.CN == None: ! # raise UserNotAllowedException, 'no CN' ! #if subject.CN == '': ! # raise UserNotAllowedException, 'empty CN' ! #if subject.CN == 'Roadwarrior': ! # raise UserNotAllowedException, 'Roadwarrior not allowed' ! if not auth['http'] and not auth['x509']: ! raise UserNotAllowedException, 'not authenticated' ! try: ! if auth['x509']: ! owner = str(auth['x509'].subject()) ! elif auth['http']: ! type, data = auth['http'].split(' ',1) ! if type == 'Basic': ! user, pw = base64.decodestring(data).split(':') ! owner = user ! else: ! raise UserNotAllowedException, 'not supported authentication method' ! else: ! owner = ANONYMOUS ! except UserNotAllowedException: ! raise ! except: ! log.error() ! owner = ANONYMOUS ! return owner ! ! def _own(self, owner, id): ! if not id.has_key('owner'): ! return False ! if id['owner'] in [None, ANONYMOUS, ROADWARRIOR] and \ ! owner in [None, ANONYMOUS, ROADWARRIOR]: ! return True ! if id['owner'] == None: ! return False ! return owner == id['owner'] ! ! def _cleanup(self): ! log.msg('Cleanup of Super Scheduler.') ! ! def _create_activity(self, grid_id, owner, jsdl_doc): ! try: ! stat, jobtype = get_job_type(jsdl_doc) ! ! if (not stat) or ( jobtype.upper() not in applicationtype_list): ! log.error("Error: %s type job sent to Super Scheduler Service" % jobtype) ! return False ! ! xml = SimpleXML(memory = jsdl_doc) ! nss = {'jsdl' : "http://schemas.ggf.org/jsdl/2005/11/jsdl"} ! jobname = xml.get_content('//jsdl:JobName', nss = nss)[0] ! id = { ! 'grid_id' : grid_id, ! 'jsdl' : jsdl_doc, ! 'state' : BES_OVERALL_STATE_NEW, ! 'owner' : owner, ! 'jobname' : jobname, ! 'sched_submit_time' : time.time(), ! 'status_long' : None, ! 'jobtype' : jobtype, ! 'run_arch' : None, ! 'run_os' : None } ! self.schedids.add(id) ! except Exception, e: ! log.error() ! ! return True ! ! # Other name of Submit job ! def CreateActivityFromJSDL(self, auth, jsdl_doc): ! owner = self._check_auth(auth) ! grid_id = self.id + '/' + mkuid() ! log.msg('New job from %s:' % owner, grid_id) ! if self._create_activity(grid_id, owner, jsdl_doc): ! return grid_id ! else: ! return None ! ! def _make_ActivityJSDLDocument(self, grid_id, jsdldoc): ! x = SimpleXML(memory = jsdldoc) ! x.get_node('/')[0].childNodes[0].tagName = 'ActivityJSDLDocument' ! x.insert_child('ActivityIdentifier',grid_id) ! s = str(x) ! del x ! return s ! ! def GetActivityJSDLDocuments(self, auth, grid_ids): ! owner = self._check_auth(auth) ! if len(grid_ids) == 0: ! # Get all job ! ids = self.schedids.select({}) ! else: ! ids = self.schedids.select_by_id_list(grid_ids) ! if owner not in self.sudns: ! # Filter user's jobs ! ids = [id for id in ids if self._own(owner,id)] ! return [self._make_ActivityJSDLDocument(id['grid_id'],id['jsdl']) \ ! for id in ids] ! ! def GetActivityStatus(self, auth, grid_ids): ! if isinstance(grid_ids, str): ! grid_ids = [grid_ids] ! owner = self._check_auth(auth) ! if len(grid_ids) == 0: ! # Get all job ! selects = self.schedids.select({}) ! else: ! selects = self.schedids.select_by_id_list(grid_ids) ! ! if owner not in self.sudns and owner not in self.monitordns: ! # Filter user's jobs ! selects = [id for id in selects if self._own(owner,id)] ! return [ self.resource.get_status(id) for id in selects ] ! ! def _stage_status_to_xml(self,inout,stages): ! return ''.join(['<Stage%sStatus id="%s" state="%s"/>' \ ! % (inout, name, data['state']) \ ! for name, data in stages.items()]) ! ! def _state_change(self, owner, state_change_request): ! state_change_response = [] ! if len(state_change_request) == 0: ! pass ! else: ! for request in state_change_request: ! #print 'state change request:', request ! if request[0] == '': ! selects = self.schedids.select({'owner' : owner}) ! for s in selects: ! s['state'] = int(request[1]['to']) ! state_change_response.append(\ ! (s['grid_id'], 'Succeeded')) ! s['status_long'] = '' ! self.schedids.commit(s['grid_id']) ! else: ! try: ! id = self.schedids.get(request[0]) ! if owner not in self.sudns: ! if not self._own(owner,id): ! state_change_response.append((request[0],'Not allowed')) ! continue ! id['state'] = int(request[1]['to']) ! state_change_response.append((request[0],'Succeeded')) ! id['status_long'] = '' ! self.schedids.commit(id['grid_id']) ! except: ! state_change_response.append((request[0],'Not found')) ! return state_change_response ! ! def RequestActivityStateChanges(self, auth, requests): ! owner = self._check_auth(auth) ! state_change_request = [] ! for request in requests: ! req_xml = SimpleXML(memory=request) ! state_change_request.append( \ ! (req_xml.get_content('//ActivityIdentifier',[''])[0], \ ! req_xml.get_attributes('//RequestedStateChange')[0])) ! #print 'state changes:', state_change_request ! return [\ ! ('<StateChangeResponse>' + \ ! '<ActivityIdentifier>%s</ActivityIdentifier>' + \ ! '<Response>%s</Response>' + \ ! '</StateChangeResponse>') \ ! % (id, resp) for (id, resp) \ ! in self._state_change (owner, state_change_request)] ! ! def _get_description(self, site_id): ! return """<?xml version='1.0'?> ! <ServiceDescription> ! <LongName>%s</LongName> ! <MajorVersion>0</MajorVersion> ! <MinorVersion>1</MinorVersion> ! <PatchVersion>0</PatchVersion> ! <Site>%s</Site> ! <WSDL></WSDL> ! <Semantics> ! Provide OGSA BES compatiblie interface to grid level ! job management ! </Semantics> ! </ServiceDescription>""" % (self.longname, site_id) --- 1,329 ---- ! """ Super Scheduler Service """ ! __revision__ = '$Revision$' ! ! from gug.common.config import Config, as_boolean ! from gug.common.share import import_class_from_string, mkuid, \ ! random_name, nice_exception, gis_proxy, create_provision_task ! from gug.common import log ! from gug.client.storage import Storage ! from gug.common.ogsa_bes import * ! from gug.common.compiler_states import * ! from gug.common.exception import * ! from gug.common.jsdl import * ! from gug.common.simplexml import SimpleXML, SimpleXMLError ! from gug.host.timed import Task ! from gug.client.soap import get_client ! from gug.XFile import XFile, LOCK_EX, LOCK_SH ! from gug.common.idstore import IDStore ! from gug.common.jsdl_parser import * ! from gug.module.dm.dm import DecisionMaker ! ! import os ! import time ! import shutil ! import socket ! import base64 ! ! JC_ID_UNKNOWN = 'Unknown' ! ANONYMOUS = 'anonymous' ! ROADWARRIOR = '/CN=Roadwarrior/emailAddress=sz...@ni...' ! ! class SuperScheduler: ! sched_run = False ! ! def __init__(self, initdata): ! self.gis_proxy = gis_proxy(initdata['ctl']) ! self.storage = Storage(gis_proxy = self.gis_proxy) ! self.id = initdata['sid'] ! self.config = Config(initdata['ConfigFile']) ! path = '/ServiceConfig/AllowAnonymous' ! self.allow_anonymous = as_boolean(self.config.get_content(path, '0')[0]) ! ! self.storage_path = self.config.get_content('/ServiceConfig/DBConnect')[0] ! self.schedids = IDStore(self.storage_path) ! ! # Init descision maker backend ! path = '/ServiceConfig/DecisionMaker' ! dm_cfg = self.config.get_dictionaries(path)[0] ! #dm_class = import_class_from_string(dm_cfg['Class']) ! #self.decision_maker = dm_class(self.gis_proxy, dm_cfg) ! self.dm = DecisionMaker(self.gis_proxy, dm_cfg) ! ! # Init resource interface backend ! path = '/ServiceConfig/ResourceInterface' ! ri_cfg = self.config.get_dictionaries(path)[0] ! ri_class = import_class_from_string(ri_cfg['Class']) ! self.resource = ri_class(self.gis_proxy, self.dm, ri_cfg) ! ! # Init scheduler task ! path = '/ServiceConfig/SchedulerPeriod' ! sched_period = self.config.get_content(path) ! Task(self.id, self._sched).start(int(sched_period[0])) ! ! path = '/ServiceConfig/SuperUserDN' ! self.sudns = self.config.get_content(path,[]) ! if not self.sudns: ! log.error('No SuperUserDN found.') ! ! path = '/ServiceConfig/MonitorUserDN' ! self.monitordns = self.config.get_content(path,[]) ! if self.monitordns: ! log.msg('MonitorUserDN found:', self.monitordns) ! ! self._set_service_name() ! create_provision_task(initdata, self._get_description, message_for_logging = 'Sched') ! ! ! ! def _set_service_name(self): ! self.longname = "Super Scheduler Service" ! ! def _sched_delete(self): ! # Select all job wich is in DELETED state ! ids = self.schedids.select({'state' : BES_OVERALL_STATE_KILLED }) ! if len(ids) == 0: ! print 'there is no deleted job' ! return False ! self.resource.remove(ids, self.schedids) ! ! def _sched_status(self): ! idsel = self.schedids.select({}) ! ids = [ id for id in idsel if id['state'] not in \ ! [ BES_OVERALL_STATE_DONE, BES_OVERALL_STATE_KILLED ] ] ! if len(ids) == 0: ! print 'there is no running job' ! return False ! ! self.resource.status(ids, self.schedids) ! ! # def _sched_submit(self): ! # ids = self.resource.get_new_jobs(self.schedids) ! # if ids: ! # rlist = self.resource.get_valid_resources() ! # for resource_id, ids in self.decision_maker.decision(ids, rlist).iteritems(): ! # try: ! # self.resource.submit(resource_id, ids, self.schedids) ! # except Exception, error: ! # log.error('Cannot submit to', resource_id, nice_exception(error)) ! # continue ! ! def _sched_submit(self): ! map = self.dm.make_decisions(self.schedids) ! while len(map) > 0: ! res_jobs = {} ! try: ! for (job_id, candidate) in map.iteritems(): ! res = candidate.next() ! if not res_jobs.has_key(res): ! res_jobs[res] = [] ! res_jobs[res].append(self.schedids.get(job_id)) ! except: ! del map[job_id] ! for (resource_id, ids) in res_jobs.iteritems(): ! if self.resource.submit(resource_id, ids, self.schedids): ! for id in ids: ! del map[id['grid_id']] ! ! def _sched(self): ! """ Check lock for scheduler task and run real function """ ! if self.sched_run is True: ! print 'SuperScheduler._sched is still running...' ! else: ! self.sched_run = True ! print 'SuperScheduler._sched is starting...' ! self.dm.query_resources() ! self._sched_status() ! if len(self.dm.resource_list) > 0: ! self._sched_submit() ! self._sched_delete() ! print 'SuperScheduler._sched is done.' ! self.sched_run = False ! ! def _check_auth(self, auth): ! print auth ! if not self.allow_anonymous: ! #if auth == None: ! # raise UserNotAllowedException, 'no auth' ! #subject = auth.get_subject() ! #if subject == None: ! # raise UserNotAllowedException, 'no subject' ! #if subject.CN == None: ! # raise UserNotAllowedException, 'no CN' ! #if subject.CN == '': ! # raise UserNotAllowedException, 'empty CN' ! #if subject.CN == 'Roadwarrior': ! # raise UserNotAllowedException, 'Roadwarrior not allowed' ! if not auth['http'] and not auth['x509']: ! raise UserNotAllowedException, 'not authenticated' ! try: ! if auth['x509']: ! owner = str(auth['x509'].subject()) ! elif auth['http']: ! type, data = auth['http'].split(' ',1) ! if type == 'Basic': ! user, pw = base64.decodestring(data).split(':') ! owner = user ! else: ! raise UserNotAllowedException, 'not supported authentication method' ! else: ! owner = ANONYMOUS ! except UserNotAllowedException: ! raise ! except: ! log.error() ! owner = ANONYMOUS ! return owner ! ! def _own(self, owner, id): ! if not id.has_key('owner'): ! return False ! if id['owner'] in [None, ANONYMOUS, ROADWARRIOR] and \ ! owner in [None, ANONYMOUS, ROADWARRIOR]: ! return True ! if id['owner'] == None: ! return False ! return owner == id['owner'] ! ! def _cleanup(self): ! log.msg('Cleanup of Super Scheduler.') ! ! def _create_activity(self, grid_id, owner, jsdl_doc): ! try: ! stat, jobtype = get_job_type(jsdl_doc) ! ! if (not stat) or ( jobtype.upper() not in applicationtype_list): ! log.error("Error: %s type job sent to Super Scheduler Service" % jobtype) ! return False ! ! xml = SimpleXML(memory = jsdl_doc) ! nss = {'jsdl' : "http://schemas.ggf.org/jsdl/2005/11/jsdl"} ! jobname = xml.get_content('//jsdl:JobName', nss = nss)[0] ! id = { ! 'grid_id' : grid_id, ! 'jsdl' : jsdl_doc, ! 'state' : BES_OVERALL_STATE_NEW, ! 'owner' : owner, ! 'jobname' : jobname, ! 'sched_submit_time' : time.time(), ! 'status_long' : None, ! 'jobtype' : jobtype, ! 'run_arch' : None, ! 'run_os' : None } ! self.schedids.add(id) ! except Exception, e: ! log.error() ! ! return True ! ! # Other name of Submit job ! def CreateActivityFromJSDL(self, auth, jsdl_doc): ! owner = self._check_auth(auth) ! grid_id = self.id + '/' + mkuid() ! log.msg('New job from %s:' % owner, grid_id) ! if self._create_activity(grid_id, owner, jsdl_doc): ! return grid_id ! else: ! return None ! ! def _make_ActivityJSDLDocument(self, grid_id, jsdldoc): ! x = SimpleXML(memory = jsdldoc) ! x.get_node('/')[0].childNodes[0].tagName = 'ActivityJSDLDocument' ! x.insert_child('ActivityIdentifier',grid_id) ! s = str(x) ! del x ! return s ! ! def GetActivityJSDLDocuments(self, auth, grid_ids): ! owner = self._check_auth(auth) ! if len(grid_ids) == 0: ! # Get all job ! ids = self.schedids.select({}) ! else: ! ids = self.schedids.select_by_id_list(grid_ids) ! if owner not in self.sudns: ! # Filter user's jobs ! ids = [id for id in ids if self._own(owner,id)] ! return [self._make_ActivityJSDLDocument(id['grid_id'],id['jsdl']) \ ! for id in ids] ! ! def GetActivityStatus(self, auth, grid_ids): ! if isinstance(grid_ids, str): ! grid_ids = [grid_ids] ! owner = self._check_auth(auth) ! if len(grid_ids) == 0: ! # Get all job ! selects = self.schedids.select({}) ! else: ! selects = self.schedids.select_by_id_list(grid_ids) ! ! if owner not in self.sudns and owner not in self.monitordns: ! # Filter user's jobs ! selects = [id for id in selects if self._own(owner,id)] ! return [ self.resource.get_status(id) for id in selects ] ! ! def _stage_status_to_xml(self,inout,stages): ! return ''.join(['<Stage%sStatus id="%s" state="%s"/>' \ ! % (inout, name, data['state']) \ ! for name, data in stages.items()]) ! ! def _state_change(self, owner, state_change_request): ! state_change_response = [] ! if len(state_change_request) == 0: ! pass ! else: ! for request in state_change_request: ! #print 'state change request:', request ! if request[0] == '': ! selects = self.schedids.select({'owner' : owner}) ! for s in selects: ! s['state'] = int(request[1]['to']) ! state_change_response.append(\ ! (s['grid_id'], 'Succeeded')) ! s['status_long'] = '' ! self.schedids.commit(s['grid_id']) ! else: ! try: ! id = self.schedids.get(request[0]) ! if owner not in self.sudns: ! if not self._own(owner,id): ! state_change_response.append((request[0],'Not allowed')) ! continue ! id['state'] = int(request[1]['to']) ! state_change_response.append((request[0],'Succeeded')) ! id['status_long'] = '' ! self.schedids.commit(id['grid_id']) ! except: ! state_change_response.append((request[0],'Not found')) ! return state_change_response ! ! def RequestActivityStateChanges(self, auth, requests): ! owner = self._check_auth(auth) ! state_change_request = [] ! for request in requests: ! req_xml = SimpleXML(memory=request) ! state_change_request.append( \ ! (req_xml.get_content('//ActivityIdentifier',[''])[0], \ ! req_xml.get_attributes('//RequestedStateChange')[0])) ! #print 'state changes:', state_change_request ! return [\ ! ('<StateChangeResponse>' + \ ! '<ActivityIdentifier>%s</ActivityIdentifier>' + \ ! '<Response>%s</Response>' + \ ! '</StateChangeResponse>') \ ! % (id, resp) for (id, resp) \ ! in self._state_change (owner, state_change_request)] ! ! def _get_description(self, site_id): ! return """<?xml version='1.0'?> ! <ServiceDescription> ! <LongName>%s</LongName> ! <MajorVersion>0</MajorVersion> ! <MinorVersion>1</MinorVersion> ! <PatchVersion>0</PatchVersion> ! <Site>%s</Site> ! <WSDL></WSDL> ! <Semantics> ! Provide OGSA BES compatiblie interface to grid level ! job management ! </Semantics> ! </ServiceDescription>""" % (self.longname, site_id) |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:34
|
Update of /cvsroot/gug/gug/gug/service/jc/lrms In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/jc/lrms Modified Files: condor.py fork.py cm.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: condor.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/jc/lrms/condor.py,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** condor.py 31 Jan 2007 13:27:41 -0000 1.16 --- condor.py 27 Sep 2007 10:36:05 -0000 1.17 *************** *** 189,193 **** return False ! def get_ce_data(self): allinfo = sysinfo.all_info() operating_system = allinfo['os'] --- 189,193 ---- return False ! def get_ce_data(self, new_jobs = 0): allinfo = sysinfo.all_info() operating_system = allinfo['os'] *************** *** 218,224 **** <OwnedCpuNum>%s</OwnedCpuNum> <JobNum>%s</JobNum> </ComputingElement> """ % (operating_system, arch, cpunum, running_job, free_cpu_num, \ ! owned_cpu_num, jobnum) if __name__ == '__main__': --- 218,225 ---- <OwnedCpuNum>%s</OwnedCpuNum> <JobNum>%s</JobNum> + <NewJobNumber>%s</NewJobNumber> </ComputingElement> """ % (operating_system, arch, cpunum, running_job, free_cpu_num, \ ! owned_cpu_num, jobnum, new_jobs) if __name__ == '__main__': Index: fork.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/jc/lrms/fork.py,v retrieving revision 1.37 retrieving revision 1.38 diff -C2 -d -r1.37 -r1.38 *** fork.py 14 Mar 2007 06:41:55 -0000 1.37 --- fork.py 27 Sep 2007 10:36:05 -0000 1.38 *************** *** 85,89 **** return False ! def get_ce_data(self): if self.exec_proxy is None: return '' --- 85,89 ---- return False ! def get_ce_data(self, new_jobs = 0): if self.exec_proxy is None: return '' *************** *** 91,94 **** --- 91,95 ---- self.ce_os = ce_data['os'] self.ce_arch = ce_data['cpu_arch'] + ce_data['new_jobs'] = new_jobs return """<ComputingElement> <LRMSType>Fork</LRMSType> *************** *** 102,105 **** --- 103,107 ---- <LoadAvg>%(load_avg)s</LoadAvg> <RunningJob>%(running_jobs)s</RunningJob> + <NewJobNumber>%(new_jobs)s</NewJobNumber> <FreeCpuNum>%(free_cpu)s</FreeCpuNum> <OwnedCpuNum>%(owned_cpu)s</OwnedCpuNum> Index: cm.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/jc/lrms/cm.py,v retrieving revision 1.18 retrieving revision 1.19 diff -C2 -d -r1.18 -r1.19 *** cm.py 13 May 2007 21:55:36 -0000 1.18 --- cm.py 27 Sep 2007 10:36:05 -0000 1.19 *************** *** 83,89 **** st = self.cm.status([lrms_id])[0] break ! except: time.sleep(1) ! continue ret = {'grid_id' : st[0], 'status' : st[1]['status'], 'error_message' : st[1]['error_message']} --- 83,89 ---- st = self.cm.status([lrms_id])[0] break ! except Exception, error: time.sleep(1) ! continue ret = {'grid_id' : st[0], 'status' : st[1]['status'], 'error_message' : st[1]['error_message']} *************** *** 91,95 **** return ret ! def get_ce_data(self): if self.cm is None: return '' --- 91,95 ---- return ret ! def get_ce_data(self, new_jobs = 0): if self.cm is None: return '' *************** *** 114,117 **** --- 114,119 ---- xml += " <FreeCpuNum>%s" % item[1]['free_cpu'] xml += "</FreeCpuNum>\n" + xml += " <NewJobNumber>%s" % new_jobs + xml += "</NewJobNumber>" else: xml += "<ExecStatus>\n" |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:32
|
Update of /cvsroot/gug/gug/gug/service/exec In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/exec Modified Files: exec.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: exec.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/exec/exec.py,v retrieving revision 1.57 retrieving revision 1.58 diff -C2 -d -r1.57 -r1.58 *** exec.py 19 Sep 2007 14:12:24 -0000 1.57 --- exec.py 27 Sep 2007 10:36:04 -0000 1.58 *************** *** 383,389 **** try: job= self.execids.get(pid) ! out.append({ 'grid_id': pid, 'status' : job['status'] , 'mem_usage': -1, 'cpu_usage':-1, 'exit_code': -1}) except: ! out.append({ 'grid_id': pid, 'status' : BES_OVERALL_STATE_UNKNOWN , 'mem_usage': -1, 'cpu_usage':-1, 'exit_code': -1}) return out --- 383,389 ---- try: job= self.execids.get(pid) ! out.append({ 'id': pid, 'status' : job['status'] , 'mem_usage': -1, 'cpu_usage':-1, 'exit_code': -1}) except: ! out.append({ 'id': pid, 'status' : BES_OVERALL_STATE_UNKNOWN , 'mem_usage': -1, 'cpu_usage':-1, 'exit_code': -1}) return out |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:32
|
Update of /cvsroot/gug/gug/gug/common In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/common Modified Files: jsdl.py idstore.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: idstore.py =================================================================== RCS file: /cvsroot/gug/gug/gug/common/idstore.py,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** idstore.py 30 Jan 2007 18:27:47 -0000 1.5 --- idstore.py 27 Sep 2007 10:36:03 -0000 1.6 *************** *** 8,12 **** import os ! class IDStore: def __init__(self, dirname): --- 8,55 ---- import os ! ! class AbstractQueue: ! '''Abstract Queue base class for services queue handling''' ! def __init__(self): ! self._ids = [] ! ! def add(self, value): ! self._ids.append(value) ! self.commit(self._ids.index(value)) ! ! def add_with_id(self, id, value): ! try: ! self._ids[id] = value ! return True ! except: ! return False ! ! def commit(self, id): ! return id < len(self._ids) ! ! def delete(self, id): ! del self._ids[id] ! self.commit(self._ids.index(value)) ! ! def delete_by_value(self, value): ! self._ids.remove(value) ! ! def get(self, id): ! return self._ids[id] ! ! def get_all(self): ! return self._ids ! ! def get_id(self, value): ! return self._ids.index(value) ! ! def select(self, filter): ! return [v for v in self._ids if v in filter] ! ! def select_by_id_list(self, ids): ! return [v for v in self._ids if self._ids.index(v) in ids] ! ! ! class IDStore(AbstractQueue): def __init__(self, dirname): *************** *** 60,66 **** --- 103,119 ---- self.commit(id['grid_id']) + def add_with_id(self, id, value): + value['grid_id'] = id + self.add(value) + def get(self, grid_id): return self._ids[grid_id] + def get_all(self): + return self._ids.values() + + def get_id(self, value): + return value.get('grid_id', None) + def delete(self, grid_id): try: *************** *** 70,73 **** --- 123,129 ---- pass self.commit(grid_id) + + def delete_by_value(self, value): + self.delete(value['grid_id']) def select(self, filter): Index: jsdl.py =================================================================== RCS file: /cvsroot/gug/gug/gug/common/jsdl.py,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** jsdl.py 30 Apr 2007 20:21:51 -0000 1.10 --- jsdl.py 27 Sep 2007 10:36:03 -0000 1.11 *************** *** 159,160 **** --- 159,191 ---- return False, '' + + def get_staging_info(jsdl_doc): + stageins = {} + stageouts = {} + try: + jsdl = SimpleXML(memory = jsdl_doc) + except SimpleXMLError, error: + log.error('JSDL parse error (%s)' % nice_exception(error)) + return {} + #path= '/jsdl:JobDefinition/jsdl:JobDescription/jsdl:DataStaging' + nss = { 'jsdl' : "http://schemas.ggf.org/jsdl/2005/11/jsdl", \ + 'jsdl-posix' : "http://schemas.ggf.org/jsdl/2005/06/jsdl-posix", \ + 'jsdl-hpcpa' : 'http://schemas.ogf.org/jsdl/2006/07/jsdl-hpcpa',\ + 'jsdl-gug' : 'http://gug.grid.niif.hu/jsdl/2005/11/jsdl-gug' } + try: + staging = jsdl.get_dictionaries('//jsdl:DataStaging', nss = nss) + for stage in staging: + if stage.has_key('jsdl:FileName'): + if stage.has_key('jsdl:Source'): + stageins[stage['jsdl:FileName']] = \ + stage['jsdl:Source'] + if stage.has_key('jsdl:Target'): + stageouts[stage['jsdl:FileName']] = \ + stage['jsdl:Target'] + # XXX: more specified excpetion handling required + except Exception, error: + log.error() + stageins = {} + stageouts = {} + + return stageins, stageouts |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:29
|
Update of /cvsroot/gug/gug/gug/service/sched/resource_iface In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/sched/resource_iface Modified Files: job_controller.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: job_controller.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/sched/resource_iface/job_controller.py,v retrieving revision 1.20 retrieving revision 1.21 diff -C2 -d -r1.20 -r1.21 *** job_controller.py 3 Apr 2007 12:48:46 -0000 1.20 --- job_controller.py 27 Sep 2007 10:36:07 -0000 1.21 *************** *** 22,26 **** jc = id_or_jc if jc: ! meta_res = self.resource_list.get(jc.split('/')[0],None) if meta_res: try: --- 22,26 ---- jc = id_or_jc if jc: ! meta_res = self.dm.resource_list.get(jc.split('/')[0],None) if meta_res: try: *************** *** 49,59 **** metadata_node.appendChild(node) ! def __init__(self, gis_proxy, config): self.gis = gis_proxy self.config = config ! self.resource_list = {} ! self.blacklist = {} ! self.blacklist_initial_value = 10 def get_resources(self): """ Get Computing Element resources from local GIS """ --- 49,61 ---- metadata_node.appendChild(node) ! def __init__(self, gis_proxy, dm, config): self.gis = gis_proxy self.config = config ! self.dm = dm ! #self.resource_list = {} ! #self.blacklist = {} ! #self.blacklist_initial_value = 10 + #TODO: Unused method def get_resources(self): """ Get Computing Element resources from local GIS """ *************** *** 93,97 **** def submit(self, resource_id, ids, schedids): try: ! url = self.resource_list[resource_id][0]['urls'][0] except Exception, error: return False --- 95,99 ---- def submit(self, resource_id, ids, schedids): try: ! url = self.dm.resource_list[resource_id][0]['urls'][0] except Exception, error: return False *************** *** 104,110 **** if rejected: raise Exception, 'job rejected' ! if self.blacklist.get(resource_id,0) > 0: print 'jc (%s) is blacklisted (%d), cannot submit:' \ ! % (jc_name, self.blacklist[resource_id]), id['grid_id'] continue log.msg('Submitting job to %s:' % jc_name, id['grid_id']) --- 106,112 ---- if rejected: raise Exception, 'job rejected' ! if self.dm.blacklisted(resource_id): print 'jc (%s) is blacklisted (%d), cannot submit:' \ ! % (jc_name, self.dm.blacklist[resource_id]), id['grid_id'] continue log.msg('Submitting job to %s:' % jc_name, id['grid_id']) *************** *** 121,124 **** --- 123,131 ---- if id['jobtype'] == 'Compiler': id['architectures'][jc_arch][jc_os]['state'] = COMPILER_STATE_COMPILING + except (socket.error, UnknownServiceError), error: + id['error'] = nice_exception(error) + ' (jc is %s)' % jc_name + log.error('Cannot submit job: %s (%s)' % (id['grid_id'], id['error'])) + self.dm.put_blacklist(resource_id) + print 'jc (%s) is blacklisted' % jc_name except Exception, error: id['error'] = nice_exception(error) + ' (jc is %s)' % jc_name *************** *** 126,134 **** log.error('Cannot submit job: %s (%s)' % \ (id['grid_id'], id['error']) ) - if str(error).find('Connection refused') > -1 \ - or str(error).find('timed out') > -1 \ - or str(error).find('sorry, no service is running here') > -1: - self.blacklist[resource_id] = self.blacklist_initial_value - print 'jc (%s) is blacklisted' % jc_name else: print 'Cannot submit job: %s (%s)' % (id['grid_id'], id['error']) --- 133,136 ---- *************** *** 151,155 **** jcid = id['job_id'].split('/')[0] try: ! url = self.resource_list[jcid][0]['urls'][0] except: continue --- 153,157 ---- jcid = id['job_id'].split('/')[0] try: ! url = self.dm.resource_list[jcid][0]['urls'][0] except: continue *************** *** 157,163 **** jc_name = self._jc_name(jcid) #print statechangexml ! if self.blacklist.get(jcid, 0) > 0: print 'jc (%s) is blacklisted (%d), cannot send kill:' \ ! % (jc_name, self.blacklist[jcid]), id['grid_id'] else: log.msg('Asking resource (%s) to remove job (id on resource is %s):' \ --- 159,165 ---- jc_name = self._jc_name(jcid) #print statechangexml ! if self.dm.blacklisted(jcid): print 'jc (%s) is blacklisted (%d), cannot send kill:' \ ! % (jc_name, self.dm.blacklist[jcid]), id['grid_id'] else: log.msg('Asking resource (%s) to remove job (id on resource is %s):' \ *************** *** 191,197 **** def status(self, ids, schedids): - for url in self.blacklist.keys(): - if self.blacklist[url] > 0: - self.blacklist[url] = self.blacklist[url] - 1 ids_valid = [ id for id in ids if id.get('job_id', None) ] --- 193,196 ---- *************** *** 204,208 **** for jcid in jcs.keys(): try: ! url = self.resource_list[jcid][0]['urls'][0] except: log.error('Resource not found (id: %s)' % jcid) --- 203,207 ---- for jcid in jcs.keys(): try: ! url = self.dm.resource_list[jcid][0]['urls'][0] except: log.error('Resource not found (id: %s)' % jcid) *************** *** 216,223 **** jc = get_client(url) jc_name = self._jc_name(jcid) ! if self.blacklist.get(jcid,0) > 0: for id in jcs[jcid]: print 'jc (%s) is blacklisted (%d), cannot get status:' \ ! % (jc_name, self.blacklist[jcid]), id['grid_id'] id['status_long'] = None if id['state'] == BES_OVERALL_STATE_RUNNING: --- 215,222 ---- jc = get_client(url) jc_name = self._jc_name(jcid) ! if self.dm.blacklisted(jcid): for id in jcs[jcid]: print 'jc (%s) is blacklisted (%d), cannot get status:' \ ! % (jc_name, self.dm.blacklist[jcid]), id['grid_id'] id['status_long'] = None if id['state'] == BES_OVERALL_STATE_RUNNING: *************** *** 230,241 **** try: sls = jc.GetActivityStatus(jc_ids.keys()) ! except Exception, error: nice_error = nice_exception(error) print 'cannot get status from %s (%s)' % (jc_name, nice_error) ! if str(error).find('Connection refused') > -1 \ ! or str(error).find('timed out') > -1 \ ! or str(error).find('sorry, no service is running here') > -1: ! self.blacklist[jcid] = self.blacklist_initial_value ! print 'jc (%s) is blacklisted' % jc_name for id in jcs[jcid]: id['status_long'] = None --- 229,241 ---- try: sls = jc.GetActivityStatus(jc_ids.keys()) ! except (socket.error, UnknownServiceError), error: nice_error = nice_exception(error) print 'cannot get status from %s (%s)' % (jc_name, nice_error) ! self.dm.put_blacklist(jcid) ! print 'jc (%s) is blacklisted' % jc_name ! continue ! except Exception, error: ! nice_error = nice_exception(error) ! print 'cannot get status from %s (%s)' % (jc_name, nice_error) for id in jcs[jcid]: id['status_long'] = None *************** *** 284,288 **** schedids.commit(id['grid_id']) return True ! def get_new_jobs(self, schedids): ids = schedids.select({ 'state' : BES_OVERALL_STATE_NEW }) --- 284,289 ---- schedids.commit(id['grid_id']) return True ! ! #TODO: delete - unused method def get_new_jobs(self, schedids): ids = schedids.select({ 'state' : BES_OVERALL_STATE_NEW }) *************** *** 300,312 **** return False return ids ! def get_valid_resources(self): vr = {} ! for res in self.resource_list.keys(): ! if self.resource_list[res][1].has_key('FreeCpuNum'): ! if (int(self.resource_list[res][1]['FreeCpuNum']) > 0) and \ ! (self.blacklist.get(res, 0) == 0): vr[res] = [] ! vr[res].append(self.resource_list[res][1]) return vr --- 301,314 ---- return False return ids ! ! #TODO: delete - unused method def get_valid_resources(self): vr = {} ! for res in self.dm.resource_list.keys(): ! if self.dm.resource_list[res][1].has_key('FreeCpuNum'): ! if (int(self.dm.resource_list[res][1]['FreeCpuNum']) > 0) and \ ! (self.dm.blacklist.get(res, 0) == 0): vr[res] = [] ! vr[res].append(self.dm.resource_list[res][1]) return vr |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:11
|
Update of /cvsroot/gug/gug/gug/module/dm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/module/dm Added Files: dm.py dm_main.py __init__.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM --- NEW FILE: dm.py --- '''Decision Maker Module ''' __revision__ = '$Revision: 1.1 $' from gug.common.jsdl import * from gug.common.simplexml import SimpleXML, SimpleXMLError from gug.common.share import import_class_from_string, nice_exception from gug.common import log from gug.common.idstore import IDStore, AbstractQueue from gug.common.config import Config import time class DecisionMaker: '''Decision Maker class''' def __init__(self, gis_proxy, config): self.gis_proxy = gis_proxy self.config = config #init resource backend resource_class = import_class_from_string(self.config['ResourceInterface']) self.resource = resource_class(self, gis_proxy, self.config) #init ordering algorithm order_class = import_class_from_string(self.config['Algorithm']) self.order = order_class(self, gis_proxy, self.config) #initial blacklist value self.blacklist_initial_value = 5 self.resource_list = {} self.blacklist = {} def query_resources(self): '''Querying resources from local GIS''' self._update_cache() self._refresh_black_list() self.resource.query_resources() def _change_states(self, queue): '''Processing status changes if needed''' self.resource.change_states(queue) def _update_cache(self): ''' Refreshing resource's timelife in the resource cache. The expired resources will be removed. ''' for res in self.resource_list.keys(): self.resource_list[res][0]['timelife'] = \ str(int(self.resource_list[res][0]['timelife']) - (int(time.time()) -\ int(self.resource_list[res][0]['timestamp']))) if int(self.resource_list[res][0]['timelife']) < 0: print 'resource (%s) timelife expired' % res del self.resource_list[res] def _refresh_black_list(self): '''Refreshing blacklisted resources''' for res in self.blacklist.keys(): self.blacklist[res] -= 1 if self.blacklist[res] <= 0: print 'resource (%s) removed from blacklist' % res del self.blacklist[res] def put_blacklist(self, resource): '''Put the given resource on blacklist''' self.blacklist[resource] = self.blacklist_initial_value def blacklisted(self,resource): '''Return True if resource is on blacklist''' return self.blacklist.get(resource, 0) > 0 def make_decisions(self, queue): ''' Map the queue item's with candidate set generator Input: scheduler's queue (an descendand of AbtractQueue class) Result: dictionary of {id: candidate_generator_object} ''' ret = {} self._change_states(queue) for item in self.resource.get_scheduling_items(queue): csg = self.resource.generate_candidate_set(item) filter = self.resource.get_resource_filter(item) candidates = self.resource.get_next_candidate(self.order.order(csg, filter), item) ret[queue.get_id(item)] = candidates return ret --- NEW FILE: dm_main.py --- from gug.client.soap import get_client from gug.module.dm.dm import DecisionMaker from gug.common.config import Config from gug.common.idstore import * if __name__ == '__main__': #cfg = Config('/home/totha/gug/gug/service/sched/sched.conf').get_dictionaries('/ServiceConfig/DecisionMaker')[0] #cfg = Config('/home/totha/gug/gug/service/cm/cm.conf').get_dictionaries('/ServiceConfig/DecisionMaker')[0] #cfg.update({'Architecture' : 'i686', 'OperatingSystem' : 'LINUX', 'TimeOut' : 600}) cfg = Config('/home/totha/gug/gug/service/stm/stm.conf.default').get_dictionaries('/ServiceConfig/DecisionMaker')[0] dm = DecisionMaker(get_client('http://localhost:21111/GIS'), cfg) dm.query_resources() #ids = IDStore("/home/totha/proba") ids = AbstractQueue() #ids.add( {'grid_id': 12234, 'status_long': None, 'jsdl': u'<?xml version="1.0" encoding="UTF-8"?>\n <jsdl:JobDefinition xmlns="http://schemas.ggf.org/jsdl/2005/11/jsdl"\n xmlns:jsdl="http://schemas.ggf.org/jsdl/2005/11/jsdl"\n xmlns:jsdl-gug="http://gug.grid.niif.hu/jsdl/2005/11/jsdl-gug"\n xmlns:jsdl-hpcpa="http://schemas.ogf.org/jsdl/2006/07/jsdl-hpcpa"\n targetNamespace="http://schemas.ggf.org/jsdl/2005/11/jsdl">\n <jsdl:JobDescription>\n <jsdl:JobIdentification>\n <jsdl:JobName>submit file example</jsdl:JobName>\n <jsdl:Description>This is a test job</jsdl:Description>\n <jsdl:JobProject>gug</jsdl:JobProject>\n </jsdl:JobIdentification>\n \n <jsdl:Application>\n <jsdl:ApplicationName>submit file example</jsdl:ApplicationName>\n <jsdl-hpcpa:BasicHPCApplication>\n <jsdl-hpcpa:Executable>bin/test.sh</jsdl-hpcpa:Executable><jsdl-hpcpa:Input>input/submit file example.stdin</jsdl-hpcpa:Input><jsdl-hpcpa:Output>output/output.txt</jsdl-hpcpa:Output>\n <jsdl-hpcpa:Error>output/error.txt</jsdl-hpcpa:Error>\n <jsdl-hpcpa:WorkingDirectory>\n jobdir/\n </jsdl-hpcpa:WorkingDirectory><jsdl-hpcpa:UserName>gug</jsdl-hpcpa:UserName>\n </jsdl-hpcpa:BasicHPCApplication><jsdl-gug:ApplicationType checkpoint="0">Binary</jsdl-gug:ApplicationType> \n </jsdl:Application><jsdl:Resources><jsdl:OperatingSystem><jsdl:OperatingSystemType><jsdl:OperatingSystemName>LINUX</jsdl:OperatingSystemName></jsdl:OperatingSystemType></jsdl:OperatingSystem><jsdl:CPUArchitecture><jsdl:CPUArchitectureName>other</jsdl:CPUArchitectureName><tns:OtherCPUArchitectures xmlns:tns="other_namespace">i686</tns:OtherCPUArchitectures></jsdl:CPUArchitecture><jsdl:TotalCPUCount><jsdl:Exact>1</jsdl:Exact></jsdl:TotalCPUCount><jsdl:DataStaging><jsdl:FileName>jobdir</jsdl:FileName><jsdl:Source><jsdl:URI>file:///grid/jobs/e/ed765d82-c6f6-4373-88a4-a80263dca90c</jsdl:URI></jsdl:Source></jsdl:DataStaging><jsdl:DataStaging><jsdl:FileName>jobdir/output</jsdl:FileName><jsdl:Target><jsdl:URI>file:///grid/jobs/e/ed765d82-c6f6-4373-88a4-a80263dca90c</jsdl:URI></jsdl:Target></jsdl:DataStaging><jsdl-gug:SoftwarePackage>\n <jsdl-gug:Name>None</jsdl-gug:Name>\n </jsdl-gug:SoftwarePackage></jsdl:Resources></jsdl:JobDescription></jsdl:JobDefinition>', 'run_os': None, 'sched_submit_time': 1188620417.373487, 'run_arch': None, 'jobname': u'submit file example', 'state': 2, 'owner': 'anonymous', 'grid_id': u'localhost-SuperScheduler/cab7076c-dd67-41c6-8e81-d9bffc2c08d7', 'jobtype': u'Binary'}) #ids.add({'grid_id': 1234, 'status': 2, 'jsdl': u'<?xml version="1.0" encoding="UTF-8"?>\n <jsdl:JobDefinition xmlns="http://schemas.ggf.org/jsdl/2005/11/jsdl"\n xmlns:jsdl="http://schemas.ggf.org/jsdl/2005/11/jsdl"\n xmlns:jsdl-gug="http://gug.grid.niif.hu/jsdl/2005/11/jsdl-gug"\n xmlns:jsdl-hpcpa="http://schemas.ogf.org/jsdl/2006/07/jsdl-hpcpa"\n targetNamespace="http://schemas.ggf.org/jsdl/2005/11/jsdl">\n <jsdl:JobDescription>\n <jsdl:JobIdentification>\n <jsdl:JobName>test</jsdl:JobName>\n <jsdl:Description>This is a test job</jsdl:Description>\n <jsdl:JobProject>gug</jsdl:JobProject>\n </jsdl:JobIdentification>\n \n <jsdl:Application>\n <jsdl:ApplicationName>test</jsdl:ApplicationName>\n <jsdl-hpcpa:BasicHPCApplication>\n <jsdl-hpcpa:Executable>bin/test.sh</jsdl-hpcpa:Executable><jsdl-hpcpa:Output>output/output.txt</jsdl-hpcpa:Output>\n <jsdl-hpcpa:Error>error/error.txt</jsdl-hpcpa:Error>\n <jsdl-hpcpa:WorkingDirectory>\n /home/totha/temp/test_job\n </jsdl-hpcpa:WorkingDirectory><jsdl-hpcpa:Environment name="JAVA_HOME">/opt/j2se</jsdl-hpcpa:Environment><jsdl-hpcpa:Environment name="PVM_ROOT">/home/gug</jsdl-hpcpa:Environment><jsdl-hpcpa:UserName>gug</jsdl-hpcpa:UserName>\n </jsdl-hpcpa:BasicHPCApplication><jsdl-gug:ApplicationType checkpoint="0">batch</jsdl-gug:ApplicationType> \n </jsdl:Application><jsdl:Resources><jsdl:OperatingSystem><jsdl:OperatingSystemType><jsdl:OperatingSystemName>LINUX</jsdl:OperatingSystemName></jsdl:OperatingSystemType></jsdl:OperatingSystem><jsdl:CPUArchitecture><jsdl:CPUArchitectureName>other</jsdl:CPUArchitectureName><tns:OtherCPUArchitectures xmlns:tns="other_namespace">i686</tns:OtherCPUArchitectures></jsdl:CPUArchitecture><jsdl:TotalCPUCount><jsdl:Exact>1</jsdl:Exact></jsdl:TotalCPUCount><jsdl-gug:SoftwarePackage>\n <jsdl-gug:Name>None</jsdl-gug:Name>\n </jsdl-gug:SoftwarePackage></jsdl:Resources></jsdl:JobDescription></jsdl:JobDefinition>', 'exec_id': False, 'cpu_usage': '-1', 'exec_job_id': False, 'error_message': '', 'last_check_time': 1188619008.5796659, 'exit_code': None, 'mem_usage': '-1', 'grid_id': '711c1f59-0fcb-4a9b-8268-efcdb5666357', 'submit_time': 1188619008.57967, 'exec_submit_time': 1188619008.5796731, 'desc': {'totalphysicalmemory': '0', 'candidatehosts': '127.0.0.1', 'ckpt_period': 10000, 'totalcpucount': u'1', 'operatingsystem': u'LINUX', 'environment': {u'JAVA_HOME': u'/opt/j2se', u'PVM_ROOT': u'/home/gug'}, 'executable': u'bin/test.sh', 'totalvirtualmemory': '0', 'checkpoint': '1', 'arguments': [], 'input': None, 'job_migration': 0, 'username': u'gug', 'description': u'This is a test job', 'jobproject': u'gug', 'individualphysicalmemory': '0', 'directory_structure': False, 'batchjob': True, 'initialdir': u'/home/totha/temp/test_job', 'individualvirtualmemory': '0', 'name': u'test', 'universe': u'batch', 'architecture': '*', 'error': u'error/error.txt', 'exclusiveexecution': '0', 'output': u'output/output.txt'}}) ids.add(14231) ids.add(13231453) map = dm.make_decisions(ids) #print map while len(map) > 0: try: for item in map.keys(): res = map[item].next() print item, res except: del map[item] # # dm.query_resources() # map = dm.make_decisions(ids) # former = None # while len(map) > 0: # # try: # for item in map.keys(): # res = map[item].next() # if former: # print former, dm.resource_list[former][1]['FreeJobPlace'] # print res, dm.resource_list[res][1]['FreeJobPlace'], item # former = res # except: # del map[item] --- NEW FILE: __init__.py --- '''Decision Maker module''' __revision__ = '$Revision: 1.1 $' |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:11
|
Update of /cvsroot/gug/gug/gug/module In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/module Added Files: __init__.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM --- NEW FILE: __init__.py --- '''GUG service modules''' __revision__ = '$Revision: 1.1 $' |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:11
|
Update of /cvsroot/gug/gug/gug/service/jc In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/jc Modified Files: jc.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: jc.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/jc/jc.py,v retrieving revision 1.63 retrieving revision 1.64 diff -C2 -d -r1.63 -r1.64 *** jc.py 30 Apr 2007 20:23:36 -0000 1.63 --- jc.py 27 Sep 2007 10:36:06 -0000 1.64 *************** *** 761,764 **** --- 761,766 ---- def _get_description(self, site_id): try: + new_jobs = len(self.jcids.select({'state' : [BES_OVERALL_STATE_NEW, \ + BES_OVERALL_STATE_STAGED_IN, BES_OVERALL_STATE_STARTING]})) return """<?xml version='1.0'?> <ServiceDescription> *************** *** 777,781 **** %s </ServiceDescription>""" % (site_id, self.capabilities, \ ! self.lrms.get_ce_data(), \ self.rt.get_disk_data(), self._get_gis_plugins_data()) --- 779,783 ---- %s </ServiceDescription>""" % (site_id, self.capabilities, \ ! self.lrms.get_ce_data(new_jobs), \ self.rt.get_disk_data(), self._get_gis_plugins_data()) |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:11
|
Update of /cvsroot/gug/gug/gug/service/cm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/cm Modified Files: cm.conf.default cm.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: cm.conf.default =================================================================== RCS file: /cvsroot/gug/gug/gug/service/cm/cm.conf.default,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** cm.conf.default 22 Apr 2007 21:27:00 -0000 1.9 --- cm.conf.default 27 Sep 2007 10:36:06 -0000 1.10 *************** *** 5,8 **** --- 5,12 ---- </CKPT> <SchedulerPeriod>20</SchedulerPeriod> + <DecisionMaker> + <ResourceInterface>gug.module.dm.resource.computing.ClusterManager</ResourceInterface> + <Algorithm>gug.module.dm.algorithm.simple.RandomOrder</Algorithm> + </DecisionMaker> <TimeOut>600</TimeOut> <DBConnect>cm_data</DBConnect> Index: cm.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/cm/cm.py,v retrieving revision 1.41 retrieving revision 1.42 diff -C2 -d -r1.41 -r1.42 *** cm.py 13 May 2007 22:13:56 -0000 1.41 --- cm.py 27 Sep 2007 10:36:06 -0000 1.42 *************** *** 16,20 **** --- 16,22 ---- from gug.common.simplexml import SimpleXML, SimpleXMLError import urlparse + import socket from gug.common.jsdl_parser import * + from gug.module.dm.dm import DecisionMaker *************** *** 25,29 **** """ ! def __init__(self, initdata): --- 27,33 ---- """ ! ! cm_run = False ! def __init__(self, initdata): *************** *** 45,48 **** --- 49,57 ---- self.os = self.config.get_content(path2)[0].strip() storage_path = self.config.get_content('/ServiceConfig/DBConnect')[0] + #Initializing Decision Maker + dm_cfg = self.config.get_dictionaries('/ServiceConfig/DecisionMaker')[0] + dm_cfg.update({'Architecture' : self.arch, 'OperatingSystem' : self.os, 'TimeOut' : self.timeout}) + self.dm = DecisionMaker(self.gis, dm_cfg) + self.cmids = IDStore(storage_path) *************** *** 51,55 **** self.ownedcpunum = 0 self.runningjob = 0 ! self.cpucount = {} self.lastexeceslog = '' --- 60,64 ---- self.ownedcpunum = 0 self.runningjob = 0 ! #self.cpucount = {} self.lastexeceslog = '' *************** *** 59,63 **** create_provision_task(initdata, self._get_description, message_for_logging = 'CM') ! def _check(self, job): if time.time() - job['last_check_time'] >= self.timeout: --- 68,72 ---- create_provision_task(initdata, self._get_description, message_for_logging = 'CM') ! #TODO: delete this, it is unused now def _check(self, job): if time.time() - job['last_check_time'] >= self.timeout: *************** *** 65,68 **** --- 74,78 ---- job['status'] = BES_OVERALL_STATE_NEW + #TODO: delete this, it is unused now def _timeout_test(self): """ *************** *** 130,134 **** raise print 'Timeout test completted!' ! def _candidate_generator(self, good_proxies, job_id): print 'looking for candidate for job', job_id --- 140,205 ---- raise print 'Timeout test completted!' ! ! def _sched_status(self): ! """ ! This procedure will check the running jobs status, if there is error ! with the job, then the resource will be blacklisted ! """ ! ids = self.cmids.select({'status' : [BES_OVERALL_STATE_RUNNING, BES_OVERALL_STATE_UNKNOWN]}) ! #ids_valid = [ id for id in ids if id.get('exec_job_id', None) ] ! execes = {} ! error = False ! for job in ids: ! exec_id = job['exec_id'] ! if not execes.has_key(exec_id): ! execes[exec_id] = [] ! execes[exec_id].append(job) ! ! for exec_id in execes.keys(): ! error = False ! try: ! if self.dm.blacklisted(exec_id): ! raise Exception, 'exec (%s) on blacklist (%s)' % (exec_id, self.dm.blacklist[exec_id]) ! exec_proxy = self.execes[exec_id] ! statuses = exec_proxy.status([id['exec_job_id'] for id in execes[exec_id]]) ! for status in statuses: ! id = execes[exec_id][statuses.index(status)] ! id['last_check_time'] = time.time() ! id['mem_usage'] = status['mem_usage'] ! id['cpu_usage'] = status['cpu_usage'] ! id['exit_code'] = status['exit_code'] ! new_status =status['status'] ! if id['status'] != new_status: ! log.msg('job\'s status changed from %s to %s: %s' % \ ! (overall_states[id['status']], overall_states[new_status], id['grid_id'])) ! id['status'] = new_status ! if status == BES_OVERALL_STATE_TERMINATED: ! try: ! id['error_message'] = \ ! file(os.path.join(id['desc']['initialdir'],id['desc']['error'])).read() ! except IOError, error: #There is no error file ! pass ! except: ! log.error() ! log.info('Job (%s) FAILED on: %s' % (id['grid_id'], exec_id)) ! self.cmids.commit(id['grid_id']) ! except KeyError: ! print 'error while getting statuses from (%s): Exec not found!' % exec_id ! error = True ! except (socket.error, UnknownServiceError), e: ! print 'error while getting statuses from (%s): Connection problem (%s)' % (exec_id, e) ! print 'exec (%s) is blacklisted!' % exec_id ! self.dm.put_blacklist(exec_id) ! error = True ! except Exception, e: ! print 'error while getting statuses from (%s) :' % exec_id, e ! error = True ! ! if error: ! for id in execes[exec_id]: ! id['status'] = BES_OVERALL_STATE_UNKNOWN ! self.cmids.commit(id['grid_id']) ! ! #TODO: delete, not used def _candidate_generator(self, good_proxies, job_id): print 'looking for candidate for job', job_id *************** *** 199,292 **** _sched() """ ! ! try: ! self._sched_delete() self._get_exec_proxies() ! self._timeout_test() ! ! good_proxies = self._choice_proxy() ! if len(good_proxies) == 0: ! return ! ! for item in self.cmids.select({'status' : BES_OVERALL_STATE_NEW}): ! result = False ! candidates = self._candidate_generator(good_proxies, item['grid_id']) ! print 'list of candidates for job %s:' % item['grid_id'], candidates ! if len(candidates) == 0: ! continue ! while len(candidates) > 0: ! random.shuffle(candidates) ! exec_id = candidates.pop() ! if item['desc'].has_key('candidatehosts'): ! if urlparse.urlparse(self.execes[exec_id]._url)[1].split(':')[0] \ ! not in item['desc']['candidatehosts']: ! print 'This host is not in the CandidateHosts list: %s' % self.execes[exec_id]._url ! continue ! try: ! if item['exec_job_id'] != False and item['desc']['batchjob'] == False and item['desc']['checkpoint'] == '1': ! try: ! item['status'] = BES_OVERALL_STATE_MIGRATION ! result = good_proxies[exec_id]['proxy'].revoke_repair(item['exec_job_id'], item['desc']) ! if result == True: ! item['status'] = BES_OVERALL_STATE_RUNNING ! item['cpu_usage'] = 'Unknown' ! item['mem_usage'] = 'Unknown' ! result = item['exec_job_id'] ! if item['exec_id'] != exec_id: ! log.msg('The %s job MIGRATED from [Execid: %s] to [Execid: %s]' % (item['grid_id'], item['exec_id'], exec_id)) ! log.msg('Job REVOKED: %s'% item['grid_id']) ! except JobNotSuspended: ! item['status'] = BES_OVERALL_STATE_RUNNING ! item['cpu_usage'] = 'Unknown' ! log.msg('Job REVOKED: %s'% item['grid_id']) ! item['mem_usage'] = 'Unknown' ! result = item['exec_job_id'] ! log.msg('The %s Exec service is comes back, the % job is already running!' % (item['exec_id'], item['grid_id'])) ! except RevokeError, error: ! log.msg('I cannot repair this job: %s, reason: %s' % (item['grid_id'], error)) ! item['status'] = BES_OVERALL_STATE_NEW ! except: ! log.error() ! item['status'] = BES_OVERALL_STATE_NEW ! if result == False: ! result = good_proxies[exec_id]['proxy'].start(item['desc']) ! if result != False: ! good_proxies[exec_id]['free_job_place'] -= 1 ! if good_proxies[exec_id]['free_job_place'] <= 0: ! del good_proxies[exec_id] ! print 'no more free_job_place on exec', exec_id ! break ! except AttributeError, error: ! log.error("One or more attribute failures from the JSDL: %s", error) ! item['status'] = BES_OVERALL_STATE_TERMINATED ! log.error('Job TERMINATED: %s' % item['grid_id']) ! except NotAcceptingNewActivities: ! result = False ! except InitialDirectoryDoesNotExist, error: ! log.error("The job's (%s) initialdirectory does not exist: %s" % (item['grid_id'],error)) ! item['error_message'] = 'The job\'s initialdir does not exist: %s' % error ! item['status'] = BES_OVERALL_STATE_TERMINATED ! log.error('Job FAILED: %s' % item['grid_id']) ! except: ! log.error() ! ! if result == False: ! print 'cannot submit job', item['grid_id'] ! else: ! item['exec_job_id'] = result ! item['desc']['job_migration'] = item['desc']['job_migration'] + 1 ! item['exec_id'] = exec_id ! item['exec_submit_time'] = time.time() ! item['status'] = BES_OVERALL_STATE_RUNNING ! log.msg('Job %s submitted to %s (job ID on Exec is %s).' \ ! % (item['grid_id'], item['exec_id'], item['exec_job_id'])) ! self.cmids.commit(item['grid_id']) except Exception, error: ! log.error() def _choice_proxy(self): """ --- 270,464 ---- _sched() """ ! if self.cm_run is True: ! print 'ClusterManager._sched is still running...' ! else: ! self.cm_run = True ! print 'ClusterManager._sched is starting...' ! self.dm.query_resources() self._get_exec_proxies() ! self._sched_status() ! ! map = self.dm.make_decisions(self.cmids) ! while len(map) > 0: ! try: ! for job_id in map.keys(): ! res = map[job_id].next() ! if self._sched_submit(self.cmids.get(job_id), res): ! del map[job_id] ! except: ! del map[job_id] ! self._sched_delete() ! print 'ClusterManager._sched is done.' ! self.cm_run = False ! #TODO: rewrite this method like in shed._sched_sbumit to achieving batch submit ! def _sched_submit(self, item, exec_id): ! result = False ! try: ! if self.dm.blacklisted(exec_id): ! raise Exception, 'Exec (%s) is on blacklist (%s), cannot submit job:' % \ ! (exec_id, self.dm.blacklist[exec_id]), item['grid_id'] ! if item['exec_job_id'] != False and item['desc']['batchjob'] == False and item['desc']['checkpoint'] == '1': ! try: ! item['status'] = BES_OVERALL_STATE_MIGRATION ! result = self.execes[exec_id].revoke_repair(item['exec_job_id'], item['desc']) ! if result == True: ! item['status'] = BES_OVERALL_STATE_RUNNING ! item['cpu_usage'] = 'Unknown' ! item['mem_usage'] = 'Unknown' ! result = item['exec_job_id'] ! if item['exec_id'] != exec_id: ! log.msg('The %s job MIGRATED from [Execid: %s] to [Execid: %s]' % (item['grid_id'], item['exec_id'], exec_id)) ! log.msg('Job REVOKED: %s'% item['grid_id']) ! except JobNotSuspended: ! item['status'] = BES_OVERALL_STATE_RUNNING ! item['cpu_usage'] = 'Unknown' ! log.msg('Job REVOKED: %s'% item['grid_id']) ! item['mem_usage'] = 'Unknown' ! result = item['exec_job_id'] ! log.msg('The %s Exec service is comes back, the % job is already running!' % (item['exec_id'], item['grid_id'])) ! except RevokeError, error: ! log.msg('I cannot repair this job: %s, reason: %s' % (item['grid_id'], error)) ! item['status'] = BES_OVERALL_STATE_NEW ! except: ! log.error() ! item['status'] = BES_OVERALL_STATE_NEW ! if result == False: ! result = self.execes[exec_id].start(item['desc']) ! except AttributeError, error: ! log.error("One or more attribute failures from the JSDL: %s", error) ! item['status'] = BES_OVERALL_STATE_TERMINATED ! log.error('Job TERMINATED: %s' % item['grid_id']) ! except NotAcceptingNewActivities, e: ! print 'Exec (%s) is rejected the job (%s)' % exec_id, nice_exception(e) ! self.dm.resource_list[exec_id][1]['FreeJobPlace'] -= 1 ! result = False ! except (socket.error, UnknownServiceError), error: ! print 'exec (%s) connection error:' % exec_id, nice_exception(error) ! self.dm.put_blacklist(exec_id) ! print 'exec (%s) is blacklisted' % exec_id ! except InitialDirectoryDoesNotExist, error: ! log.error("The job's (%s) initialdirectory does not exist: %s" % (item['grid_id'],error)) ! item['error_message'] = 'The job\'s initialdir does not exist: %s' % error ! item['status'] = BES_OVERALL_STATE_TERMINATED ! log.error('Job FAILED: %s' % item['grid_id']) except Exception, error: ! print 'cannot submit job (%s) : ' % item['grid_id'], nice_exception(error) ! ! if result == False: ! print 'cannot submit job', item['grid_id'] ! else: ! item['exec_job_id'] = result ! item['desc']['job_migration'] = item['desc']['job_migration'] + 1 ! item['exec_id'] = exec_id ! item['exec_submit_time'] = time.time() ! item['status'] = BES_OVERALL_STATE_RUNNING ! log.msg('Job %s submitted to %s (job ID on Exec is %s).' \ ! % (item['grid_id'], item['exec_id'], item['exec_job_id'])) ! self.cmids.commit(item['grid_id']) ! return result ! ! # def _sched(self): ! # """ ! # This procedure will schedule the jobs. It will ask for working proxies. ! # ! # _sched() ! # """ ! # ! # try: ! # self._sched_status() ! # self._sched_submit() ! # self._sched_delete() ! # self._get_exec_proxies() ! # self._timeout_test() ! # ! # good_proxies = self._choice_proxy() ! # if len(good_proxies) == 0: ! # return ! # ! # for item in self.cmids.select({'status' : BES_OVERALL_STATE_NEW}): ! # result = False ! # candidates = self._candidate_generator(good_proxies, item['grid_id']) ! # print 'list of candidates for job %s:' % item['grid_id'], candidates ! # ! # if len(candidates) == 0: ! # continue ! # ! # while len(candidates) > 0: ! # random.shuffle(candidates) ! # exec_id = candidates.pop() ! # if item['desc'].has_key('candidatehosts'): ! # if urlparse.urlparse(self.execes[exec_id]._url)[1].split(':')[0] \ ! # not in item['desc']['candidatehosts']: ! # print 'This host is not in the CandidateHosts list: %s' % self.execes[exec_id]._url ! # continue ! # try: ! # if item['exec_job_id'] != False and item['desc']['batchjob'] == False and item['desc']['checkpoint'] == '1': ! # try: ! # item['status'] = BES_OVERALL_STATE_MIGRATION ! # result = good_proxies[exec_id]['proxy'].revoke_repair(item['exec_job_id'], item['desc']) ! # if result == True: ! # item['status'] = BES_OVERALL_STATE_RUNNING ! # item['cpu_usage'] = 'Unknown' ! # item['mem_usage'] = 'Unknown' ! # result = item['exec_job_id'] ! # if item['exec_id'] != exec_id: ! # log.msg('The %s job MIGRATED from [Execid: %s] to [Execid: %s]' % (item['grid_id'], item['exec_id'], exec_id)) ! # log.msg('Job REVOKED: %s'% item['grid_id']) ! # except JobNotSuspended: ! # item['status'] = BES_OVERALL_STATE_RUNNING ! # item['cpu_usage'] = 'Unknown' ! # log.msg('Job REVOKED: %s'% item['grid_id']) ! # item['mem_usage'] = 'Unknown' ! # result = item['exec_job_id'] ! # log.msg('The %s Exec service is comes back, the % job is already running!' % (item['exec_id'], item['grid_id'])) ! # except RevokeError, error: ! # log.msg('I cannot repair this job: %s, reason: %s' % (item['grid_id'], error)) ! # item['status'] = BES_OVERALL_STATE_NEW ! # except: ! # log.error() ! # item['status'] = BES_OVERALL_STATE_NEW ! # ! # if result == False: ! # result = good_proxies[exec_id]['proxy'].start(item['desc']) ! # ! # if result != False: ! # good_proxies[exec_id]['free_job_place'] -= 1 ! # if good_proxies[exec_id]['free_job_place'] <= 0: ! # del good_proxies[exec_id] ! # print 'no more free_job_place on exec', exec_id ! # break ! # except AttributeError, error: ! # log.error("One or more attribute failures from the JSDL: %s", error) ! # item['status'] = BES_OVERALL_STATE_TERMINATED ! # log.error('Job TERMINATED: %s' % item['grid_id']) ! # except NotAcceptingNewActivities: ! # result = False ! # except InitialDirectoryDoesNotExist, error: ! # log.error("The job's (%s) initialdirectory does not exist: %s" % (item['grid_id'],error)) ! # item['error_message'] = 'The job\'s initialdir does not exist: %s' % error ! # item['status'] = BES_OVERALL_STATE_TERMINATED ! # log.error('Job FAILED: %s' % item['grid_id']) ! # except: ! # log.error() ! # ! # if result == False: ! # print 'cannot submit job', item['grid_id'] ! # else: ! # item['exec_job_id'] = result ! # item['desc']['job_migration'] = item['desc']['job_migration'] + 1 ! # item['exec_id'] = exec_id ! # item['exec_submit_time'] = time.time() ! # item['status'] = BES_OVERALL_STATE_RUNNING ! # log.msg('Job %s submitted to %s (job ID on Exec is %s).' \ ! # % (item['grid_id'], item['exec_id'], item['exec_job_id'])) ! # self.cmids.commit(item['grid_id']) ! # except Exception, error: ! # log.error() + #TODO: delete this, it is unused now def _choice_proxy(self): """ *************** *** 330,333 **** --- 502,506 ---- 'cpu_usage' : '-1', 'mem_usage':'-1', 'exit_code': None}) log.msg("New job: %s " % pid) + #print self.cmids.get(pid) return pid except UnknownActivityType, e: *************** *** 432,435 **** --- 605,611 ---- log.msg('Job deleted: %s' % job['grid_id']) continue + if self.dm.blacklisted(job['exec_id']): + print 'Exec (%s) is blacklisted (%s), cannot delete job' % \ + (job['exec_id'], self.dm.blacklist[job['exec_id ']]), job['grid_id'] retries = 3 while retries > 0: *************** *** 516,539 **** cpunum = 0 freecpunum = 0 ! for item in self.gis.get({'type' : 'Exec'}): try: ! data = SimpleXML(memory = item[1]) ! if data.get_content('/ServiceDescription/ClusterElement/CPUArchitecture')[0].strip() \ ! != self.arch and \ ! data.get_content('/ServiceDescription/ClusterElement/OperatingSystem')[0].strip() \ ! != self.os: ! continue ! try: ! client = get_working_client(item[0]['urls']) ! except: ! continue ! execes[item[0]['sid']] = client ! data = SimpleXML(memory = item[1]) ! cpucount = int(data.get_content('/ServiceDescription/ClusterElement/CPUCount')[0]) ! cpunum = cpunum + cpucount ! self.cpucount[item[0]['sid']] = cpucount ! freecpunum = freecpunum + int(data.get_content('/ServiceDescription/ClusterElement/FreeCpuNum')[0]) except Exception, e: ! print 'error on parsing Exec provision data', nice_exception(e) execeslog = ', '.join([e._url for e in self.execes.values()]) if execeslog != self.lastexeceslog: --- 692,706 ---- cpunum = 0 freecpunum = 0 ! for item in self.dm.resource_list.values(): try: ! client = get_client(item[0]['urls'][0]) except Exception, e: ! print 'Cannot reach exec (%s):' % item[0]['sid'], nice_exception(e) ! continue ! execes[item[0]['sid']] = client ! data = item[1] ! cpucount = int(data['CPUCount']) ! cpunum = cpunum + cpucount ! freecpunum = freecpunum + int(data['FreeCpuNum']) execeslog = ', '.join([e._url for e in self.execes.values()]) if execeslog != self.lastexeceslog: *************** *** 550,563 **** try: out = create_desc_from_jsdl(jsdl_str) ! print out #out['walltimelimit'] = 1209600 # 2 weeks out['ckpt_period'] = self.CKPTPeriod out['directory_structure'] = directory_structure out['job_migration'] = 0 ! print out except Exception, error: log.msg('Unfatal exception until parse jsdl: %s' % str(error)) ! print 'submit description:', out return out --- 717,731 ---- try: out = create_desc_from_jsdl(jsdl_str) ! #print out #out['walltimelimit'] = 1209600 # 2 weeks out['ckpt_period'] = self.CKPTPeriod out['directory_structure'] = directory_structure out['job_migration'] = 0 ! #print out except Exception, error: log.msg('Unfatal exception until parse jsdl: %s' % str(error)) + raise ! #print 'submit description:', out return out *************** *** 659,663 **** out[item]['completed'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_COMPLETED, 'exec_id' : item}))) out[item]['jobnum'] = str(len(self.cmids.select({'exec_id' : item}))) ! out[item]['cpunum'] = self.cpucount[item] out[item]['new'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_NEW, 'exec_id' : item}))) out[item]['terminated'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_TERMINATED, 'exec_id' : item}))) --- 827,831 ---- out[item]['completed'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_COMPLETED, 'exec_id' : item}))) out[item]['jobnum'] = str(len(self.cmids.select({'exec_id' : item}))) ! out[item]['cpunum'] = self.dm.resource_list[item][1]['CPUCount'] out[item]['new'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_NEW, 'exec_id' : item}))) out[item]['terminated'] = str(len(self.cmids.select({'status' : BES_OVERALL_STATE_TERMINATED, 'exec_id' : item}))) |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:04
|
Update of /cvsroot/gug/gug/gug/service/stm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/service/stm Modified Files: stm.py stm.conf.default Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM Index: stm.conf.default =================================================================== RCS file: /cvsroot/gug/gug/gug/service/stm/stm.conf.default,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** stm.conf.default 30 Jan 2007 18:24:29 -0000 1.5 --- stm.conf.default 27 Sep 2007 10:36:05 -0000 1.6 *************** *** 2,5 **** --- 2,9 ---- <ServiceConfig> <RefreshInterval>10</RefreshInterval> + <DecisionMaker> + <ResourceInterface>gug.module.dm.resource.storage.StorageManager</ResourceInterface> + <Algorithm>gug.module.dm.algorithm.simple.RandomOrder</Algorithm> + </DecisionMaker> <PlugIn> <Class>gug.service.stm.plugins.file.File</Class> Index: stm.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/stm/stm.py,v retrieving revision 1.53 retrieving revision 1.54 diff -C2 -d -r1.53 -r1.54 *** stm.py 27 Jul 2007 14:15:45 -0000 1.53 --- stm.py 27 Sep 2007 10:36:05 -0000 1.54 *************** *** 18,21 **** --- 18,23 ---- from gug.host.timed import Task from gug.client.soap import get_client, get_working_client + from gug.module.dm.dm import DecisionMaker + from gug.common.idstore import AbstractQueue class StorageManager: *************** *** 46,49 **** --- 48,54 ---- Task(self.id, self._refresh).start(period, now = True, \ now_interval = 3) + #Initializing Decision Maker + dm_cfg = config.get_dictionaries('/ServiceConfig/DecisionMaker')[0] + self.dm = DecisionMaker(self.gis, dm_cfg) plugins = config.get_multi_dictionaries('/ServiceConfig/PlugIn') self.pluginof = {} *************** *** 71,94 **** except: log.error() ! try: ! stcs = self.gis.get({'type' : 'StorageController'}) ! self.stcs = [(meta['urls'], data) \ ! for (meta, data) in stcs] ! print '_refresh STC:', [urls for (urls, _) in self.stcs] ! except: ! log.error() def _stc(self, min_space = 0): ! try: ! stcs = [] ! for urls, data in self.stcs: ! xml = SimpleXML(memory=data) ! if int(xml.get_content('//DataStorageFree',[0])[0]) > min_space: ! stcs.extend(urls) ! return get_working_client(stcs) ! except: ! log.error() ! raise NoServiceKnown, 'No StC with enough space known' ! def _fss(self): try: --- 76,98 ---- except: log.error() ! self.dm.query_resources() ! print '_refresh STC:', self.dm.resource_list.keys() def _stc(self, min_space = 0): ! queue = AbstractQueue() ! queue.add(min_space) ! map = self.dm.make_decisions(queue) ! while len(map) > 0: ! try: ! for id in map.keys(): ! res = map[id].next() ! return get_working_client(self.dm.resource_list[res][0]['urls']) ! except StopIteration: ! del map[id] ! except Exception, e: ! print 'Error to connect stc (%s):' % res, e ! self.dm.put_blacklist(res) ! raise NoServiceKnown, 'No StC with enough space known' ! def _fss(self): try: |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:03
|
Update of /cvsroot/gug/gug/gug/module/dm/algorithm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/module/dm/algorithm Added Files: simple.py __init__.py job_data.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM --- NEW FILE: job_data.py --- '''Simple ordering algrithms''' __revision__ = '$Revision: 1.1 $' from gug.module.dm.algorithm.simple import AbstractOrder from gug.client.storage import Storage from gug.client.soap import get_client from gug.common.jsdl import get_staging_info from gug.common.simplexml import SimpleXML, SimpleXMLError from gug.common import log from gug.common.exception import * import urlparse class JSDLDataStageOrdering(AbstractOrder): def __init__(self, dm, gis_proxy, config): self.dm = dm self.gis_proxy = gis_proxy self.config = config self.storage = Storage(gis_proxy = gis_proxy) def _crop_file(self,path): if path.startswith('file://'): path = path[7:] return path def order(self, csg, jsdl): try: stcs = {} stageins = self._get_stage_in_data_from_jsdl(jsdl) for stage in stageins.values(): try: stage_dirs = self.storage.ls(self._crop_file(stage), True) for (dir, datas) in stage_dirs: for data in datas: if data.has_key('surl'): for surl in data['surl']: #host = urlparse.urlparse(surl)[1].split(':')[0] host = surl.split('-')[0] stcs[host] = stcs.get(host, 0) + 1 except NotDirectoryLikeException: for surl in self.storage.stat(self._crop_file(stage))['surl']: #host = urlparse.urlparse(surl)[1].split(':')[0] host = surl.split('-')[0] stcs[host] = stcs.get(host, 0) + 1 print stcs try: yield csg.next() except: return except: log.error() def _get_stage_in_data_from_jsdl(self, jsdl): stage_in, stage_out = get_staging_info(jsdl) print stage_in print stage_out return stage_in --- NEW FILE: simple.py --- '''Simple ordering algrithms''' __revision__ = '$Revision: 1.1 $' import random class AbstractOrder: def __init__(self, dm, gis_proxy, config): self.dm = dm self.gis_proxy = gis_proxy self.config = config def order(self, csg, filter = None): try: yield csg.next() except: return class RandomOrder(AbstractOrder): '''Random ordering''' def order(self, csg, filter = None): ret = [] try: while True: ret = [] for i in range(3): ret.append(csg.next()) random.shuffle(ret) for i in ret: yield i except: random.shuffle(ret) for i in ret: yield i class MaximumFreeSpaceOrder(AbstractOrder): '''Maximum Free Space ordering''' is_ordered = False def _max_space_order_cache(self): for (key, value) in self.dm.resource_list.iteritems(): print self.dm.resource_list[key][1]['DataStorageFree'] key.sort(self.dm.resource_list[key][1]['DataStorageFree']) def order(self, csg, filter = None): self._max_space_order_cache() ret = [] try: while True: ret = [] for i in range(3): ret.append(csg.next()) random.shuffle(ret) for i in ret: yield i except: random.shuffle(ret) for i in ret: yield i --- NEW FILE: __init__.py --- '''Ordering algorithms for DM''' __revision__ = '$Revision: 1.1 $' |
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:02
|
Update of /cvsroot/gug/gug/gug/module/dm/resource In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/module/dm/resource Added Files: __init__.py abstract.py storage.py computing.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM --- NEW FILE: computing.py --- ''' Computing Resource backend''' __revision__ = '$Revision: 1.1 $' from gug.common import log from gug.common.jsdl import * from gug.common.jsdl_parser import * from gug.common.share import nice_exception from gug.common.ogsa_bes import * from gug.common.simplexml import SimpleXML, SimpleXMLError from gug.client.soap import get_client from gug.module.dm.resource.abstract import AbstractResource import time import urlparse class ComputingResource(AbstractResource): exact_test = ['operatingsystem', 'architecture'] number_test = ['totalcpucount', 'totalphysicalmemory', 'totalvirtualmemory', 'checkpoint', \ 'exclusiveexecution'] def _query_resources(self, resource_list, type): """ Get Computing Element resources from local GIS """ try: gis_list = self.gis_proxy.get({'resourcetype' : type}) except Exception, error: log.error() for res in gis_list: meta_data = res[0] id = meta_data['sid'] meta_data['timestamp'] = time.time() if resource_list.has_key(id) and \ int(resource_list[id][0]['timelife']) >= int(meta_data['timelife']): print 'resource (%s) timelife (%s) is smaller than in the cache (%s)' % \ (id, meta_data['timelife'], resource_list[id][0]['timelife']) continue data = res[1] try: service_desc = SimpleXML(memory = data) path = '/ServiceDescription/' + type ce_data = service_desc.get_dictionaries(path)[0] if not resource_list.has_key(id) or not resource_list[id][1]: log.msg('Computing Element data arrived from %s.' % id) except SimpleXMLError, error: log.error('No Computing Element data from %s.' % id) continue try: path = '/ServiceDescription/Capabilities' cap_data = service_desc.get_dictionaries(path)[0] except SimpleXMLError, error: cap_data = {} resource_list[id] = (meta_data, ce_data, cap_data) print 'available resources:', ', '.join(resource_list.keys()) '''Check the resource that it meets the job's requirements''' def _test_candidate(self, job_desc, res_desc): if job_desc.has_key('candidatehosts'): hosts = [url for url in res_desc['urls'] if urlparse.urlparse(url)[1].split(':')[0] in job_desc['candidatehosts']] if len(hosts) == 0: print 'candidate (%s) failed because of: CandidateHosts:' \ % res_desc['name'], job_desc['candidatehosts'] return False for i in self.exact_test: if job_desc.has_key(i) and res_desc.has_key(i): if job_desc[i].upper() != '*' and job_desc[i].upper() != res_desc[i].upper(): print 'candidate (%s) failed because of:' % res_desc['name'], i return False for i in self.number_test: if job_desc.has_key(i) and res_desc.has_key(i): if float(job_desc[i]) > float(res_desc[i]): print 'candidate (%s) failed because of:' % res_desc['name'], i return False return True class ClusterManager(ComputingResource): def change_states(self, ids): jobs = ids.select({'status' : BES_OVERALL_STATE_UNKNOWN }) for job in jobs: if time.time() - job['last_check_time'] >= self.config['TimeOut']: log.msg('TIMEOUT! Job reschedule: %s' % job['grid_id']) job['status'] = BES_OVERALL_STATE_NEW ids.commit(job['grid_id']) def query_resources(self): self._query_resources(self.dm.resource_list, 'ClusterElement') for res in self.dm.resource_list.keys(): if self.dm.resource_list[res][1]['OperatingSystem'].upper() != self.config['OperatingSystem'].upper() or\ self.dm.resource_list[res][1]['CPUArchitecture'].upper() != self.config['Architecture'].upper(): del self.dm.resource_list[res] continue try: proxy = get_client(self.dm.resource_list[res][0]['urls'][0]) self.dm.resource_list[res][1]['FreeJobPlace'] = proxy.free_job_place() except Exception, error: 'cannot get exec\'s (%s) freejobplace:' % res, nice_exception(error) def get_scheduling_items(self, ids): return ids.select({ 'status' : BES_OVERALL_STATE_NEW }) def generate_candidate_set(self, job): job_desc = job['desc'] for res in self.dm.resource_list.keys(): res_desc = self._create_desc(self.dm.resource_list[res]) if not self.dm.blacklisted(res) and self._test_candidate(job_desc, res_desc) and\ self.dm.resource_list[res][1].has_key('FreeJobPlace') and\ self.dm.resource_list[res][1]['FreeJobPlace'] > 0: yield res return def get_next_candidate(self, candidate, id): actual = None try: while True: actual = candidate.next() self.dm.resource_list[actual][1]['FreeJobPlace'] = \ self.dm.resource_list[actual][1]['FreeJobPlace'] - 1 yield actual self.dm.resource_list[actual][1]['FreeJobPlace'] = \ self.dm.resource_list[actual][1]['FreeJobPlace'] + 1 except: pass return def _create_desc(self, res): desc = {} desc['name'] = res[0]['sid'] desc['urls'] = res[0]['urls'] desc['operatingsystem'] = res[1]['OperatingSystem'] desc['architecture'] = res[1]['CPUArchitecture'] desc['totalcpucount'] = res[1]['CPUCount'] desc['totalphysicalmemory'] = res[1]['FreePhysicalMemory'] desc['totalvirtualmemory'] = res[1]['FreeVirtualMemory'] desc['checkpoint'] = res[1]['CheckPoint'] if int(res[1]['ProcessPerCPU']) == 1: desc['exclusiveexecution'] = 1 else: desc['exclusiveexecution'] = 0 return desc class SuperScheduler(ComputingResource): def query_resources(self): self._query_resources(self.dm.resource_list, 'ComputingElement') def generate_candidate_set(self, job): job_desc = create_desc_from_jsdl(job['jsdl']) for res in self.dm.resource_list.keys(): res_desc = self._create_desc(self.dm.resource_list[res]) if not self.dm.blacklisted(res) and self._test_candidate(job_desc, res_desc) and\ self.dm.resource_list[res][1].has_key('FreeCpuNum') and\ (int(self.dm.resource_list[res][1]['FreeCpuNum']) - int(self.dm.resource_list[res][1]['NewJobNumber'])) > 0: yield res return def get_scheduling_items(self, ids): return ids.select({ 'state' : BES_OVERALL_STATE_NEW }) def get_next_candidate(self, candidate, id): actual = None try: while True: actual = candidate.next() self.dm.resource_list[actual][1]['FreeCpuNum'] = \ str(int(self.dm.resource_list[actual][1]['FreeCpuNum']) - 1) yield actual self.dm.resource_list[actual][1]['FreeCpuNum'] = \ str(int(self.dm.resource_list[actual][1]['FreeCpuNum']) + 1) except: pass #if actual: # self.dm.resource_list[actual][0]['FreeCpuNum'] = \ # str(int(self.dm.resource_list[actual][0]['FreeCpuNum']) + 1) return def get_resource_filter(self, id): return id['jsdl'] def _create_desc(self, res): desc = {} desc['name'] = res[0]['sid'] desc['urls'] = res[0]['urls'] desc['operatingsystem'] = res[1]['OperatingSystem'] desc['architecture'] = res[1]['Architecture'] desc['totalcpucount'] = res[1]['CpuNum'] return desc --- NEW FILE: abstract.py --- ''' Abtrac Resource backend interface''' __revision__ = '$Revision: 1.1 $' from gug.common.idstore import AbstractQueue class AbstractResource: '''DM resource backend's base Class''' def __init__(self, dm, gis_proxy, config): self.gis_proxy = gis_proxy self.dm = dm self.config = config '''It will change the queue's elements states if needed''' def change_states(self, queue): pass '''Queries resources from local GIS''' def query_resources(self): pass '''Returns lost if those ids, which should be scheduled''' def get_scheduling_items(self, queue): return queue.get_all() '''Returns a candidate set generator object, that will iterates the good resources that meets the queue id's requirements''' def generate_candidate_set(self, id): pass '''Returns a generator object, which will get the next candidate and change there's attributes if needed''' def get_next_candidate(self, candidate, id): pass def get_resource_filter(self, id): return None --- NEW FILE: storage.py --- ''' Storage Resource backend''' __revision__ = '$Revision: 1.1 $' from gug.common import log from gug.common.share import nice_exception from gug.common.simplexml import SimpleXML, SimpleXMLError from gug.client.soap import get_client from gug.module.dm.resource.abstract import AbstractResource import time class StorageManager(AbstractResource): def change_states(self, queue): pass def query_resources(self): """ Get Storage Element resources from local GIS """ try: gis_list = self.gis_proxy.get({'resourcetype' : 'StorageElement'}) except Exception, error: log.error() for res in gis_list: meta_data = res[0] id = meta_data['sid'] meta_data['timestamp'] = time.time() if self.dm.resource_list.has_key(id) and \ int(self.dm.resource_list[id][0]['timelife']) >= int(meta_data['timelife']): print 'resource (%s) timelife (%s) is smaller than in the cache (%s)' % \ (id, meta_data['timelife'], self.dm.resource_list[id][0]['timelife']) continue data = res[1] try: service_desc = SimpleXML(memory = data) path = '/ServiceDescription/StorageElement' se_data = service_desc.get_dictionaries(path)[0] if not self.dm.resource_list.has_key(id) or not self.dm.resource_list[id][1]: log.msg('Storage Element data arrived from %s.' % id) except SimpleXMLError, error: se_data = {} log.error('No Storage Element data from %s.' % id) self.dm.resource_list[id] = (meta_data, se_data) print 'available resources:', ', '.join(self.dm.resource_list.keys()) def get_scheduling_items(self, queue): return queue.get_all() def generate_candidate_set(self, id): for res in self.dm.resource_list.keys(): if not self.dm.blacklisted(res) and \ int(self.dm.resource_list[res][1]['DataStorageFree']) > id: yield res return def get_next_candidate(self, candidate, id): actual = None try: while True: actual = candidate.next() self.dm.resource_list[actual][1]['DataStorageFree'] = \ str(int(self.dm.resource_list[actual][1]['DataStorageFree']) - id) yield actual self.dm.resource_list[actual][1]['DataStorageFree'] = \ str(int(self.dm.resource_list[actual][1]['DataStorageFree']) + id) except: return --- NEW FILE: __init__.py --- '''Resource Interface backends for DM''' __revision__ = '$Revision: 1.1 $' |
From: Adrian T. <cs...@us...> - 2007-09-27 10:35:34
|
Update of /cvsroot/gug/gug/gug/module/dm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23441/gug/module/dm Log Message: Directory /cvsroot/gug/gug/gug/module/dm added to the repository |
From: Adrian T. <cs...@us...> - 2007-09-27 10:35:34
|
Update of /cvsroot/gug/gug/gug/module In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23441/gug/module Log Message: Directory /cvsroot/gug/gug/gug/module added to the repository |
From: Adrian T. <cs...@us...> - 2007-09-27 10:35:34
|
Update of /cvsroot/gug/gug/gug/module/dm/resource In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23441/gug/module/dm/resource Log Message: Directory /cvsroot/gug/gug/gug/module/dm/resource added to the repository |
From: Adrian T. <cs...@us...> - 2007-09-27 10:35:34
|
Update of /cvsroot/gug/gug/gug/module/dm/algorithm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23441/gug/module/dm/algorithm Log Message: Directory /cvsroot/gug/gug/gug/module/dm/algorithm added to the repository |
From: Roczei G. <ro...@us...> - 2007-09-19 14:12:32
|
Update of /cvsroot/gug/gug/gug/service/exec In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv2628 Modified Files: exec.py Log Message: bugfix: error_message=The job's initialdir does not exist: /home/gug/jobroot/2cd8ecd0-f350-4bf4-8ecb-90becc9a67b0/jobdir Index: exec.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/exec/exec.py,v retrieving revision 1.56 retrieving revision 1.57 diff -C2 -d -r1.56 -r1.57 *** exec.py 19 Sep 2007 10:45:21 -0000 1.56 --- exec.py 19 Sep 2007 14:12:24 -0000 1.57 *************** *** 58,63 **** executable = desc['executable'] ! if not path.exists(desc['initialdir']): ! raise InitialDirectoryDoesNotExist, desc['initialdir'] if not str(executable).startswith('/'): --- 58,70 ---- executable = desc['executable'] ! retries = 10 ! while True: ! if not path.exists(desc['initialdir']): ! time.sleep(2) ! retries = retries - 1 ! if retries < 0: ! raise InitialDirectoryDoesNotExist, desc['initialdir'] ! else: ! break if not str(executable).startswith('/'): |
From: Roczei G. <ro...@us...> - 2007-09-19 10:45:25
|
Update of /cvsroot/gug/gug/gug/service/exec In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv19382 Modified Files: exec.py Log Message: bugfix: UnboundLocalError: local variable 'proc' referenced before assignment Index: exec.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/exec/exec.py,v retrieving revision 1.55 retrieving revision 1.56 diff -C2 -d -r1.55 -r1.56 *** exec.py 13 May 2007 21:54:28 -0000 1.55 --- exec.py 19 Sep 2007 10:45:21 -0000 1.56 *************** *** 143,148 **** log.error() ! while proc == None: ! time.sleep(1) self.my_exec.pool[self.poolid].proc = proc --- 143,154 ---- log.error() ! while True: ! try: ! #bugfix: UnboundLocalError: local variable 'proc' referenced before assignment ! if proc == None: ! time.sleep(1) ! break ! except: ! continue self.my_exec.pool[self.poolid].proc = proc |
From: Nagy Z. <zs...@us...> - 2007-07-27 14:15:47
|
Update of /cvsroot/gug/gug/gug/service/stm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv16867/gug/service/stm Modified Files: stm.py Log Message: bugfix Index: stm.py =================================================================== RCS file: /cvsroot/gug/gug/gug/service/stm/stm.py,v retrieving revision 1.52 retrieving revision 1.53 diff -C2 -d -r1.52 -r1.53 *** stm.py 30 Jan 2007 11:10:39 -0000 1.52 --- stm.py 27 Jul 2007 14:15:45 -0000 1.53 *************** *** 86,90 **** if int(xml.get_content('//DataStorageFree',[0])[0]) > min_space: stcs.extend(urls) ! return get_working_client(stcs) except: log.error() --- 86,90 ---- if int(xml.get_content('//DataStorageFree',[0])[0]) > min_space: stcs.extend(urls) ! return get_working_client(stcs) except: log.error() |
From: Roczei G. <ro...@us...> - 2007-07-24 22:51:19
|
Update of /cvsroot/gug/gug/etc In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv22730 Modified Files: stunnel.conf Log Message: bugfix: gridsite DNS error comment: storage tunnel Index: stunnel.conf =================================================================== RCS file: /cvsroot/gug/gug/etc/stunnel.conf,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** stunnel.conf 24 Jul 2007 18:45:21 -0000 1.1 --- stunnel.conf 24 Jul 2007 22:51:17 -0000 1.2 *************** *** 50,54 **** [gridsite] accept = localhost:30000 ! connect = gridsite.grid.niif.hu:21111 ;[storage] --- 50,55 ---- [gridsite] accept = localhost:30000 ! ;connect = gridsite.grid.niif.hu:21111 ! connect = 193.225.13.189:21111 ;[storage] |
From: Roczei G. <ro...@us...> - 2007-07-24 18:45:21
|
Update of /cvsroot/gug/gug/etc In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv11141 Added Files: stunnel.conf Log Message: stunnel4 config HTTP HTTP HTTPS GUG client <-------> Eclipse TCP/IP monitor <-----> stunnel4 <------> GUG server --- NEW FILE: stunnel.conf --- ; Sample stunnel configuration file by Michal Trojnara 2002-2006 ; Some options used here may not be adequate for your particular configuration ; Please make sure you understand them (especially the effect of chroot jail) ; Certificate/key is needed in server mode and optional in client mode cert = /etc/stunnel/roadwarrior_cert.pem key = /etc/stunnel/roadwarrior_key.pem ; Protocol version (all, SSLv2, SSLv3, TLSv1) sslVersion = SSLv3 ; Some security enhancements for UNIX systems - comment them out on Win32 chroot = /var/lib/stunnel4/ setuid = stunnel4 setgid = stunnel4 ; PID is created inside chroot jail pid = /stunnel4.pid ; Some performance tunings socket = l:TCP_NODELAY=1 socket = r:TCP_NODELAY=1 ;compression = rle ; Workaround for Eudora bug ;options = DONT_INSERT_EMPTY_FRAGMENTS ; Authentication stuff ;verify = 2 ; Don't forget to c_rehash CApath ; CApath is located inside chroot jail ;CApath = /certs ; It's often easier to use CAfile CAfile = /etc/stunnel/gugca_cert.pem ; Don't forget to c_rehash CRLpath ; CRLpath is located inside chroot jail ;CRLpath = /crls ; Alternatively you can use CRLfile ;CRLfile = /etc/stunnel/crls.pem ; Some debugging stuff useful for troubleshooting debug = 7 output = /var/log/stunnel4/stunnel.log ; Use it for client mode client = yes ; Service-level configuration [gridsite] accept = localhost:30000 connect = gridsite.grid.niif.hu:21111 ;[storage] ;accept = localhost:50000 ;connect = storage.grid.niif.hu:21111 ; vim:ft=dosini |
From: Roczei G. <ro...@us...> - 2007-07-23 08:30:33
|
Update of /cvsroot/gug/gug/etc In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv9472 Added Files: roadwarrior.jks Log Message: Roadwarrior's java key store http://www.agentbob.info/agentbob/79.html alias=roadwarrior password=roadwarrior --- NEW FILE: roadwarrior.jks --- (This appears to be a binary file; contents omitted.) |
From: Roczei G. <ro...@us...> - 2007-06-26 14:51:52
|
Update of /cvsroot/gug/gug/gug/host/handler In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv16206 Modified Files: soap.py Log Message: SOAP debug off Index: soap.py =================================================================== RCS file: /cvsroot/gug/gug/gug/host/handler/soap.py,v retrieving revision 1.19 retrieving revision 1.20 diff -C2 -d -r1.19 -r1.20 *** soap.py 26 Jun 2007 14:07:48 -0000 1.19 --- soap.py 26 Jun 2007 14:51:50 -0000 1.20 *************** *** 63,68 **** """ try: ! if 'http://gug.grid.niif.hu/schemas/2007/06' in data: ! log.msg('SOAP request:', data) # parse the SOAP message using ZSI's ParsedSoap class --- 63,68 ---- """ try: ! #if 'http://gug.grid.niif.hu/schemas/2007/06' in data: ! # log.msg('SOAP request:', data) # parse the SOAP message using ZSI's ParsedSoap class *************** *** 84,89 **** arguments = ps.Parse(MyAny(nillable = True)) ! if 'http://gug.grid.niif.hu/schemas/2007/06' in data: ! log.msg('arguments:', arguments) if isinstance(arguments,dict): --- 84,89 ---- arguments = ps.Parse(MyAny(nillable = True)) ! #if 'http://gug.grid.niif.hu/schemas/2007/06' in data: ! # log.msg('arguments:', arguments) if isinstance(arguments,dict): *************** *** 98,103 **** response = str(SoapWriter().serialize({'Response': result}, MyAny(nillable = True))) ! if 'http://gug.grid.niif.hu/schemas/2007/06' in data: ! log.msg('SOAP response:', response) except Exception, e: --- 98,103 ---- response = str(SoapWriter().serialize({'Response': result}, MyAny(nillable = True))) ! #if 'http://gug.grid.niif.hu/schemas/2007/06' in data: ! # log.msg('SOAP response:', response) except Exception, e: |
From: Roczei G. <ro...@us...> - 2007-06-26 14:07:50
|
Update of /cvsroot/gug/gug/gug/host/handler In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv31333 Modified Files: soap.py Log Message: SOAP debug: request/arguments/response Index: soap.py =================================================================== RCS file: /cvsroot/gug/gug/gug/host/handler/soap.py,v retrieving revision 1.18 retrieving revision 1.19 diff -C2 -d -r1.18 -r1.19 *** soap.py 6 Jun 2007 14:21:12 -0000 1.18 --- soap.py 26 Jun 2007 14:07:48 -0000 1.19 *************** *** 63,70 **** """ try: # parse the SOAP message using ZSI's ParsedSoap class ps = ParsedSoap(data, readerclass = ExpatReaderClass) # the root node of the SOAP message is the method_name ! method_name = ps.body_root.nodeName if hasattr(self.servo, method_name): # if the served_object has a method called 'method_name' --- 63,73 ---- """ try: + if 'http://gug.grid.niif.hu/schemas/2007/06' in data: + log.msg('SOAP request:', data) + # parse the SOAP message using ZSI's ParsedSoap class ps = ParsedSoap(data, readerclass = ExpatReaderClass) # the root node of the SOAP message is the method_name ! method_name = ps.body_root.nodeName.split(':')[-1] if hasattr(self.servo, method_name): # if the served_object has a method called 'method_name' *************** *** 80,83 **** --- 83,90 ---- #args = tuple(ps.Parse(TC.Array('args',MyAny(nillable = True), undeclared = True))) arguments = ps.Parse(MyAny(nillable = True)) + + if 'http://gug.grid.niif.hu/schemas/2007/06' in data: + log.msg('arguments:', arguments) + if isinstance(arguments,dict): kw = arguments *************** *** 90,96 **** # create a SOAP message response = str(SoapWriter().serialize({'Response': result}, MyAny(nillable = True))) except Exception, e: # if there is any exception, first write it to the log ! # log.error() # or maybe not # get the string representation of the Exception class # use cgi.escape not to interfere with the xml syntax of the SOAP message --- 97,107 ---- # create a SOAP message response = str(SoapWriter().serialize({'Response': result}, MyAny(nillable = True))) + + if 'http://gug.grid.niif.hu/schemas/2007/06' in data: + log.msg('SOAP response:', response) + except Exception, e: # if there is any exception, first write it to the log ! log.error() # or maybe not # get the string representation of the Exception class # use cgi.escape not to interfere with the xml syntax of the SOAP message |
From: Roczei G. <ro...@us...> - 2007-06-26 13:57:49
|
Update of /cvsroot/gug/gug/gug/php In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv26885 Added Files: gis.php Log Message: The first PHP <--> WSDL <--> GUG service test --- NEW FILE: gis.php --- <?php $client = new SoapClient("http://localhost:21111/GIS"); $Request["type"] = "ClusterManager"; $Response = $client->get($Request); print "Service mdata:\n\n"; foreach ( get_object_vars($Response[0][0]) as $key1 => $value1 ) { if (gettype($value1) != "string") { print $key1 . ":\n"; foreach ( $value1 as $key2 => $value2 ) { print "\t" . $value2 . "\n"; } } else { print $key1 . "=".$value1 . "\n"; } } print "\nService data:\n\n" . $Response[0][1]."\n"; ?> |