[Gug-cvs] gug/gug/module/dm dm.py, NONE, 1.1 dm_main.py, NONE, 1.1 __init__.py, NONE, 1.1
Status: Planning
Brought to you by:
szferi
From: Adrian T. <cs...@us...> - 2007-09-27 10:36:11
|
Update of /cvsroot/gug/gug/gug/module/dm In directory sc8-pr-cvs2.sourceforge.net:/tmp/cvs-serv23476/gug/module/dm Added Files: dm.py dm_main.py __init__.py Log Message: Adding Decision Maker module It is Integrated into the following services: Sched, CM, StM --- NEW FILE: dm.py --- '''Decision Maker Module ''' __revision__ = '$Revision: 1.1 $' from gug.common.jsdl import * from gug.common.simplexml import SimpleXML, SimpleXMLError from gug.common.share import import_class_from_string, nice_exception from gug.common import log from gug.common.idstore import IDStore, AbstractQueue from gug.common.config import Config import time class DecisionMaker: '''Decision Maker class''' def __init__(self, gis_proxy, config): self.gis_proxy = gis_proxy self.config = config #init resource backend resource_class = import_class_from_string(self.config['ResourceInterface']) self.resource = resource_class(self, gis_proxy, self.config) #init ordering algorithm order_class = import_class_from_string(self.config['Algorithm']) self.order = order_class(self, gis_proxy, self.config) #initial blacklist value self.blacklist_initial_value = 5 self.resource_list = {} self.blacklist = {} def query_resources(self): '''Querying resources from local GIS''' self._update_cache() self._refresh_black_list() self.resource.query_resources() def _change_states(self, queue): '''Processing status changes if needed''' self.resource.change_states(queue) def _update_cache(self): ''' Refreshing resource's timelife in the resource cache. The expired resources will be removed. ''' for res in self.resource_list.keys(): self.resource_list[res][0]['timelife'] = \ str(int(self.resource_list[res][0]['timelife']) - (int(time.time()) -\ int(self.resource_list[res][0]['timestamp']))) if int(self.resource_list[res][0]['timelife']) < 0: print 'resource (%s) timelife expired' % res del self.resource_list[res] def _refresh_black_list(self): '''Refreshing blacklisted resources''' for res in self.blacklist.keys(): self.blacklist[res] -= 1 if self.blacklist[res] <= 0: print 'resource (%s) removed from blacklist' % res del self.blacklist[res] def put_blacklist(self, resource): '''Put the given resource on blacklist''' self.blacklist[resource] = self.blacklist_initial_value def blacklisted(self,resource): '''Return True if resource is on blacklist''' return self.blacklist.get(resource, 0) > 0 def make_decisions(self, queue): ''' Map the queue item's with candidate set generator Input: scheduler's queue (an descendand of AbtractQueue class) Result: dictionary of {id: candidate_generator_object} ''' ret = {} self._change_states(queue) for item in self.resource.get_scheduling_items(queue): csg = self.resource.generate_candidate_set(item) filter = self.resource.get_resource_filter(item) candidates = self.resource.get_next_candidate(self.order.order(csg, filter), item) ret[queue.get_id(item)] = candidates return ret --- NEW FILE: dm_main.py --- from gug.client.soap import get_client from gug.module.dm.dm import DecisionMaker from gug.common.config import Config from gug.common.idstore import * if __name__ == '__main__': #cfg = Config('/home/totha/gug/gug/service/sched/sched.conf').get_dictionaries('/ServiceConfig/DecisionMaker')[0] #cfg = Config('/home/totha/gug/gug/service/cm/cm.conf').get_dictionaries('/ServiceConfig/DecisionMaker')[0] #cfg.update({'Architecture' : 'i686', 'OperatingSystem' : 'LINUX', 'TimeOut' : 600}) cfg = Config('/home/totha/gug/gug/service/stm/stm.conf.default').get_dictionaries('/ServiceConfig/DecisionMaker')[0] dm = DecisionMaker(get_client('http://localhost:21111/GIS'), cfg) dm.query_resources() #ids = IDStore("/home/totha/proba") ids = AbstractQueue() #ids.add( {'grid_id': 12234, 'status_long': None, 'jsdl': u'<?xml version="1.0" encoding="UTF-8"?>\n <jsdl:JobDefinition xmlns="http://schemas.ggf.org/jsdl/2005/11/jsdl"\n xmlns:jsdl="http://schemas.ggf.org/jsdl/2005/11/jsdl"\n xmlns:jsdl-gug="http://gug.grid.niif.hu/jsdl/2005/11/jsdl-gug"\n xmlns:jsdl-hpcpa="http://schemas.ogf.org/jsdl/2006/07/jsdl-hpcpa"\n targetNamespace="http://schemas.ggf.org/jsdl/2005/11/jsdl">\n <jsdl:JobDescription>\n <jsdl:JobIdentification>\n <jsdl:JobName>submit file example</jsdl:JobName>\n <jsdl:Description>This is a test job</jsdl:Description>\n <jsdl:JobProject>gug</jsdl:JobProject>\n </jsdl:JobIdentification>\n \n <jsdl:Application>\n <jsdl:ApplicationName>submit file example</jsdl:ApplicationName>\n <jsdl-hpcpa:BasicHPCApplication>\n <jsdl-hpcpa:Executable>bin/test.sh</jsdl-hpcpa:Executable><jsdl-hpcpa:Input>input/submit file example.stdin</jsdl-hpcpa:Input><jsdl-hpcpa:Output>output/output.txt</jsdl-hpcpa:Output>\n <jsdl-hpcpa:Error>output/error.txt</jsdl-hpcpa:Error>\n <jsdl-hpcpa:WorkingDirectory>\n jobdir/\n </jsdl-hpcpa:WorkingDirectory><jsdl-hpcpa:UserName>gug</jsdl-hpcpa:UserName>\n </jsdl-hpcpa:BasicHPCApplication><jsdl-gug:ApplicationType checkpoint="0">Binary</jsdl-gug:ApplicationType> \n </jsdl:Application><jsdl:Resources><jsdl:OperatingSystem><jsdl:OperatingSystemType><jsdl:OperatingSystemName>LINUX</jsdl:OperatingSystemName></jsdl:OperatingSystemType></jsdl:OperatingSystem><jsdl:CPUArchitecture><jsdl:CPUArchitectureName>other</jsdl:CPUArchitectureName><tns:OtherCPUArchitectures xmlns:tns="other_namespace">i686</tns:OtherCPUArchitectures></jsdl:CPUArchitecture><jsdl:TotalCPUCount><jsdl:Exact>1</jsdl:Exact></jsdl:TotalCPUCount><jsdl:DataStaging><jsdl:FileName>jobdir</jsdl:FileName><jsdl:Source><jsdl:URI>file:///grid/jobs/e/ed765d82-c6f6-4373-88a4-a80263dca90c</jsdl:URI></jsdl:Source></jsdl:DataStaging><jsdl:DataStaging><jsdl:FileName>jobdir/output</jsdl:FileName><jsdl:Target><jsdl:URI>file:///grid/jobs/e/ed765d82-c6f6-4373-88a4-a80263dca90c</jsdl:URI></jsdl:Target></jsdl:DataStaging><jsdl-gug:SoftwarePackage>\n <jsdl-gug:Name>None</jsdl-gug:Name>\n </jsdl-gug:SoftwarePackage></jsdl:Resources></jsdl:JobDescription></jsdl:JobDefinition>', 'run_os': None, 'sched_submit_time': 1188620417.373487, 'run_arch': None, 'jobname': u'submit file example', 'state': 2, 'owner': 'anonymous', 'grid_id': u'localhost-SuperScheduler/cab7076c-dd67-41c6-8e81-d9bffc2c08d7', 'jobtype': u'Binary'}) #ids.add({'grid_id': 1234, 'status': 2, 'jsdl': u'<?xml version="1.0" encoding="UTF-8"?>\n <jsdl:JobDefinition xmlns="http://schemas.ggf.org/jsdl/2005/11/jsdl"\n xmlns:jsdl="http://schemas.ggf.org/jsdl/2005/11/jsdl"\n xmlns:jsdl-gug="http://gug.grid.niif.hu/jsdl/2005/11/jsdl-gug"\n xmlns:jsdl-hpcpa="http://schemas.ogf.org/jsdl/2006/07/jsdl-hpcpa"\n targetNamespace="http://schemas.ggf.org/jsdl/2005/11/jsdl">\n <jsdl:JobDescription>\n <jsdl:JobIdentification>\n <jsdl:JobName>test</jsdl:JobName>\n <jsdl:Description>This is a test job</jsdl:Description>\n <jsdl:JobProject>gug</jsdl:JobProject>\n </jsdl:JobIdentification>\n \n <jsdl:Application>\n <jsdl:ApplicationName>test</jsdl:ApplicationName>\n <jsdl-hpcpa:BasicHPCApplication>\n <jsdl-hpcpa:Executable>bin/test.sh</jsdl-hpcpa:Executable><jsdl-hpcpa:Output>output/output.txt</jsdl-hpcpa:Output>\n <jsdl-hpcpa:Error>error/error.txt</jsdl-hpcpa:Error>\n <jsdl-hpcpa:WorkingDirectory>\n /home/totha/temp/test_job\n </jsdl-hpcpa:WorkingDirectory><jsdl-hpcpa:Environment name="JAVA_HOME">/opt/j2se</jsdl-hpcpa:Environment><jsdl-hpcpa:Environment name="PVM_ROOT">/home/gug</jsdl-hpcpa:Environment><jsdl-hpcpa:UserName>gug</jsdl-hpcpa:UserName>\n </jsdl-hpcpa:BasicHPCApplication><jsdl-gug:ApplicationType checkpoint="0">batch</jsdl-gug:ApplicationType> \n </jsdl:Application><jsdl:Resources><jsdl:OperatingSystem><jsdl:OperatingSystemType><jsdl:OperatingSystemName>LINUX</jsdl:OperatingSystemName></jsdl:OperatingSystemType></jsdl:OperatingSystem><jsdl:CPUArchitecture><jsdl:CPUArchitectureName>other</jsdl:CPUArchitectureName><tns:OtherCPUArchitectures xmlns:tns="other_namespace">i686</tns:OtherCPUArchitectures></jsdl:CPUArchitecture><jsdl:TotalCPUCount><jsdl:Exact>1</jsdl:Exact></jsdl:TotalCPUCount><jsdl-gug:SoftwarePackage>\n <jsdl-gug:Name>None</jsdl-gug:Name>\n </jsdl-gug:SoftwarePackage></jsdl:Resources></jsdl:JobDescription></jsdl:JobDefinition>', 'exec_id': False, 'cpu_usage': '-1', 'exec_job_id': False, 'error_message': '', 'last_check_time': 1188619008.5796659, 'exit_code': None, 'mem_usage': '-1', 'grid_id': '711c1f59-0fcb-4a9b-8268-efcdb5666357', 'submit_time': 1188619008.57967, 'exec_submit_time': 1188619008.5796731, 'desc': {'totalphysicalmemory': '0', 'candidatehosts': '127.0.0.1', 'ckpt_period': 10000, 'totalcpucount': u'1', 'operatingsystem': u'LINUX', 'environment': {u'JAVA_HOME': u'/opt/j2se', u'PVM_ROOT': u'/home/gug'}, 'executable': u'bin/test.sh', 'totalvirtualmemory': '0', 'checkpoint': '1', 'arguments': [], 'input': None, 'job_migration': 0, 'username': u'gug', 'description': u'This is a test job', 'jobproject': u'gug', 'individualphysicalmemory': '0', 'directory_structure': False, 'batchjob': True, 'initialdir': u'/home/totha/temp/test_job', 'individualvirtualmemory': '0', 'name': u'test', 'universe': u'batch', 'architecture': '*', 'error': u'error/error.txt', 'exclusiveexecution': '0', 'output': u'output/output.txt'}}) ids.add(14231) ids.add(13231453) map = dm.make_decisions(ids) #print map while len(map) > 0: try: for item in map.keys(): res = map[item].next() print item, res except: del map[item] # # dm.query_resources() # map = dm.make_decisions(ids) # former = None # while len(map) > 0: # # try: # for item in map.keys(): # res = map[item].next() # if former: # print former, dm.resource_list[former][1]['FreeJobPlace'] # print res, dm.resource_list[res][1]['FreeJobPlace'], item # former = res # except: # del map[item] --- NEW FILE: __init__.py --- '''Decision Maker module''' __revision__ = '$Revision: 1.1 $' |