From: <bsc...@us...> - 2011-03-31 07:45:05
|
Revision: 9641 http://unicore.svn.sourceforge.net/unicore/?rev=9641&view=rev Author: bschuller Date: 2011-03-31 07:44:58 +0000 (Thu, 31 Mar 2011) Log Message: ----------- better solution for async, non-batch execution Modified Paths: -------------- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/ExecutionContext.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLProcessor.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/jsdl/PrePostActionProcessor.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/simple/BasicManager.java xnjs/trunk/xnjs-module-core/src/test/resources/ems/execution_environment_with_precommand.jsdl Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/ExecutionContext.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/ExecutionContext.java 2011-03-31 07:18:17 UTC (rev 9640) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/ExecutionContext.java 2011-03-31 07:44:58 UTC (rev 9641) @@ -125,6 +125,9 @@ this.workingDirectory = workingDirectory; } + /** + * @return Returns the outcome directory (which is guaranteed to end with the target system's file separator) + */ public String getOutcomeDirectory() { return outcomeDirectory!=null? outcomeDirectory : workingDirectory; } Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLProcessor.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLProcessor.java 2011-03-31 07:18:17 UTC (rev 9640) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLProcessor.java 2011-03-31 07:44:58 UTC (rev 9641) @@ -61,6 +61,7 @@ import de.fzj.unicore.xnjs.ems.event.ContinueProcessingEvent; import de.fzj.unicore.xnjs.ems.event.XnjsEvent; import de.fzj.unicore.xnjs.ems.processors.DefaultProcessor; +import de.fzj.unicore.xnjs.jsdl.PrePostActionProcessor.SubCommand; import de.fzj.unicore.xnjs.resources.ResourceSet; import de.fzj.unicore.xnjs.simple.BasicExecution; import de.fzj.unicore.xnjs.tsi.ApplicationInfo; @@ -300,7 +301,10 @@ else{ assurePrePostProcessingAvailable(); String userPre=ee.getUserPreCommand(); - String subID=createPrePostAction(userPre); + SubCommand cmd=new SubCommand(); + cmd.type="PRE"; + cmd.cmd=userPre; + String subID=createPrePostAction(cmd); action.getProcessingContext().put(subactionkey_pre, subID); action.getProcessingContext().set(ApplicationExecutionStatus.precommandRunning()); } @@ -345,7 +349,10 @@ } else{ assurePrePostProcessingAvailable(); - String cmd=ee.getUserPostCommand(); + String userPost=ee.getUserPostCommand(); + SubCommand cmd=new SubCommand(); + cmd.type="POST"; + cmd.cmd=userPost; String subID=createPrePostAction(cmd); action.getProcessingContext().put(subactionkey_post, subID); action.getProcessingContext().set(ApplicationExecutionStatus.postcommandRunning()); @@ -391,8 +398,8 @@ * @return the ID of the sub action * @throws ExecutionException */ - protected String createPrePostAction(String...cmds)throws ExecutionException{ - return manager.addSubAction(cmds, XNJSConstants.jsdlPrePostActionType, action, true); + protected String createPrePostAction(SubCommand cmd)throws ExecutionException{ + return manager.addSubAction(cmd, XNJSConstants.jsdlPrePostActionType, action, true); } /** @@ -408,11 +415,11 @@ return; } action.setWaiting(true); + exec.submit(action); + ApplicationExecutionStatus aes=action.getProcessingContext().get(ApplicationExecutionStatus.class); + aes.set(ApplicationExecutionStatus.MAIN_EXECUTION); action.setStatus(ActionStatus.QUEUED); storeTimeStamp(SUBMITTED); - ApplicationExecutionStatus aes=action.getProcessingContext().get(ApplicationExecutionStatus.class); - aes.set(ApplicationExecutionStatus.MAIN_EXECUTION); - exec.submit(action); }catch(ExecutionException ex){ String msg="Could not submit job: "+ex.getMessage(); action.addLogTrace(msg); Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/jsdl/PrePostActionProcessor.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/jsdl/PrePostActionProcessor.java 2011-03-31 07:18:17 UTC (rev 9640) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/jsdl/PrePostActionProcessor.java 2011-03-31 07:44:58 UTC (rev 9641) @@ -1,6 +1,6 @@ package de.fzj.unicore.xnjs.jsdl; -import java.util.concurrent.TimeUnit; +import java.io.Serializable; import org.apache.log4j.Logger; @@ -10,12 +10,12 @@ import de.fzj.unicore.xnjs.ems.ExecutionContext; import de.fzj.unicore.xnjs.ems.ExecutionException; import de.fzj.unicore.xnjs.ems.ProcessingException; -import de.fzj.unicore.xnjs.ems.event.ContinueProcessingEvent; import de.fzj.unicore.xnjs.ems.processors.DefaultProcessor; import de.fzj.unicore.xnjs.legacy.TSIUtils; import de.fzj.unicore.xnjs.resources.ResourceSet; import de.fzj.unicore.xnjs.tsi.ApplicationInfo; import de.fzj.unicore.xnjs.tsi.IExecution; +import de.fzj.unicore.xnjs.tsi.TSI; import de.fzj.unicore.xnjs.tsi.TSIBusyException; import de.fzj.unicore.xnjs.util.LogUtil; @@ -45,43 +45,33 @@ } protected void submit()throws ExecutionException{ + SubCommand subCommand = (SubCommand)action.getAjd(); ExecutionContext ec=action.getExecutionContext(); - ec.setOutcomeDirectory(ec.getWorkingDirectory()+"_PRECOMMAND_OUTPUT"); - configuration.getTargetSystemInterface(action.getClient()).mkdir(ec.getOutcomeDirectory()); - ec.setExitCodeFileName("PRECOMMAND_"+TSIUtils.EXITCODE_FILENAME); + TSI tsi=configuration.getTargetSystemInterface(action.getClient()); + ec.setOutcomeDirectory(ec.getWorkingDirectory()+"_UNICORE_"+subCommand.type+tsi.getFileSeparator()); + tsi.mkdir(ec.getOutcomeDirectory()); + ec.setExitCodeFileName(subCommand.type+"_"+TSIUtils.EXITCODE_FILENAME); ec.setInteractive(true); - StringBuilder sb=new StringBuilder(); - String[]preCmds=(String[])action.getAjd(); - boolean first=true; - for(String p: preCmds){ - if(!first){ - sb.append("&& "); //TODO windows?! - }else first=false; - sb.append(p); - } - - String exe=sb.toString(); - action.addLogTrace("Executing command: "+exe); + ec.setDaemon(true); + action.addLogTrace("Executing command: "+subCommand.cmd); action.setStatus(ActionStatus.PENDING); - ApplicationInfo appInfo=new ApplicationInfo(); - appInfo.setExecutable(exe); + ApplicationInfo appInfo=action.getApplicationInfo(); + appInfo.setExecutable(subCommand.cmd); ec.setResourceRequest(new ResourceSet()); - action.setApplicationInfo(appInfo); } @Override protected void handlePending()throws ProcessingException{ try{ sleep(1000); + action.setStatus(ActionStatus.QUEUED); exec.submit(action); - action.setStatus(ActionStatus.RUNNING); }catch(ExecutionException e){ throw new ProcessingException(e); }catch(TSIBusyException tbe){ logger.debug("Could not submit action",tbe); //will retry later - action.setWaiting(true); - manager.scheduleEvent(new ContinueProcessingEvent(action.getUUID()), 1000, TimeUnit.MILLISECONDS); + action.setWaiting(false); } } @@ -90,7 +80,7 @@ try{ exec.updateStatus(action); if(action.getStatus()==ActionStatus.QUEUED){ - action.setWaiting(true); + sleep(3000); } }catch(ExecutionException ex){ String msg="Could not update status: "+ex.getMessage(); @@ -105,14 +95,12 @@ */ protected void handleRunning()throws ProcessingException{ if(logger.isTraceEnabled())logger.trace("Handling RUNNING state for Action "+action.getUUID()); + logger.info("Handling RUNNING state for Action "+action.getUUID()); try{ exec.updateStatus(action); if(action.getStatus()==ActionStatus.RUNNING){ sleep(3000); } - else if(action.getStatus()==ActionStatus.POSTPROCESSING){ - setToSuccess(); - } }catch(ExecutionException ex){ String msg="Could not update status for action "+ex.getMessage(); action.addLogTrace(msg); @@ -120,6 +108,12 @@ } } + + @Override + protected void handlePostProcessing()throws ProcessingException{ + setToSuccess(); + } + protected void setToSuccess(){ action.setStatus(ActionStatus.DONE); action.setResult(new ActionResult(ActionResult.SUCCESSFUL)); @@ -129,4 +123,11 @@ protected void handleRemoving()throws ProcessingException{ //NOP } + + public static class SubCommand implements Serializable{ + private final static long serialVersionUID=1l; + public String cmd; //the command to execute + public String type; //PRE or POST + } + } Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java 2011-03-31 07:18:17 UTC (rev 9640) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java 2011-03-31 07:44:58 UTC (rev 9641) @@ -104,6 +104,18 @@ */ public static final String DEFAULT_GRACE_PERIOD="0"; + /** + * this property holds the maximum runtime in seconds + */ + public static final String MAX_RUNTIME_FOR_INTERACTIVE_APPS="CLASSICTSI.interactive_execution.maxtime"; + + /** + * using this key, the usual grace period can be overridden + * (and stored in the action's processing context) + */ + public static final String CUSTOM_GRACE_PERIOD="CLASSICTSI.statusupdate.grace.custom"; + + public Execution(Configuration config){ super(config); factory=configuration.getComponentInstanceOfType(TSIConnectionFactory.class); @@ -143,12 +155,27 @@ ApplicationInfo appDescription=job.getApplicationInfo(); if(appDescription==null)throw new ExecutionException("No application info, can't submit."); incarnationTweaker.preScript(appDescription, job, configuration.getIDB()); + ExecutionContext ec=job.getExecutionContext(); + + if(ec.isInteractive()){ + //use the grace period mechanism to allow interactive jobs a maximum runtime + int gr=computeGracePeriodForInteractiveApps(); + job.getProcessingContext().put(CUSTOM_GRACE_PERIOD, gr); + job.addLogTrace("Interactive execution: allowing maximum runtime of "+gr*updateInterval/1000 + " seconds."); + } + try{ + boolean execAsync=ec.isInteractive() && ec.isDaemon(); + + String tsiCmdInitial=execAsync ? + TSIUtils.makeExecuteAsyncScript(job,configuration) + : TSIUtils.makeSubmitCommand(job, configuration.getIDB()); + String tsiCmd=incarnationTweaker.postScript(appDescription, job, configuration.getIDB(), tsiCmdInitial); + + conn=configuration.getComponentInstanceOfType(TSIConnectionFactory.class).getTSIConnection(job.getClient()); + String msg; + synchronized(lock){ - String tsiCmdInitial=TSIUtils.makeSubmitCommand(job, configuration.getIDB()); - String tsiCmd=incarnationTweaker.postScript(appDescription, job, configuration.getIDB(), tsiCmdInitial); - - conn=configuration.getComponentInstanceOfType(TSIConnectionFactory.class).getTSIConnection(job.getClient()); String res=conn.send(tsiCmd); job.addLogTrace("Command is: \n"+tsiCmd); if(res.contains("TSI_FAILED")){ @@ -158,20 +185,20 @@ job.addLogTrace("TSI reply: submission OK."); String bssid=res.trim(); //strip newline - String msg="Submitted to classic TSI as ["+conn.getIdLine()+"] with BSSID="+bssid; + msg="Submitted to classic TSI as ["+conn.getIdLine()+"] with BSSID="+bssid; String initialState="UNKNOWN"; String internalID=bssid; if(job.getExecutionContext().isInteractive()){ msg="Submitted to classic TSI as ["+conn.getIdLine()+"] bypassing the BSS."; - initialState="COMPLETED"; + if(!ec.isDaemon())initialState="COMPLETED"; internalID=job.getUUID(); } job.setBSID(internalID); - jobExecLogger.debug(msg); - job.addLogTrace(msg); BSSInfo newJob=new BSSInfo(internalID,job.getUUID(), initialState); bssInfo.put(internalID, newJob); } + jobExecLogger.debug(msg); + job.addLogTrace(msg); }catch(Exception ex){ jobExecLogger.error("Error submitting job.",ex); @@ -188,13 +215,17 @@ */ public void updateStatus(Action job) throws ExecutionException { try{ + int myGracePeriod=gracePeriod; final String bssID=job.getBSID(); + if(bssID==null){ + throw new ExecutionException("Status check can't be done: action with ID "+job.getUUID()+" does not have a batch system ID."); + } String status; BSSInfo info=bssInfo.get(bssID); status=info!=null?info.bssState:null; if(jobExecLogger.isDebugEnabled())jobExecLogger.debug("Action with bssid="+bssID+" is "+status); - + if("QUEUED".equals(status)){ job.setStatus(ActionStatus.QUEUED); } @@ -204,7 +235,7 @@ updateProgress(job); job.setStatus(ActionStatus.RUNNING); } - else if(status==null){ + else if("UNKNOWN".equals(status) || status==null){ //check if exit code can be read boolean haveExitCode=getExitCode(job); String jobID=job.getUUID(); @@ -217,7 +248,11 @@ job.getProcessingContext().put(GRACE_PERIOD,i); job.setDirty(); - if(i.intValue()-1<gracePeriod){ + //check for a custom grace period + Integer g=(Integer)job.getProcessingContext().get(CUSTOM_GRACE_PERIOD); + if(g!=null)myGracePeriod=g.intValue(); + + if(i.intValue()-1<myGracePeriod){ if(jobExecLogger.isDebugEnabled())jobExecLogger.debug("No BSS status found for "+jobID+" BSS id="+bssID+", will re-check."); bssInfo.put(bssID,new BSSInfo(bssID, jobID, "UNKNOWN")); } @@ -247,7 +282,7 @@ job.fail(); } } - }catch(Exception ex){//wrap it + }catch(Exception ex){ jobExecLogger.error("Error updating job status.",ex); throw new ExecutionException(ex); } @@ -306,7 +341,7 @@ InputStream is=null; BufferedReader br=null; try{ - is=tsi.getInputStream(ctx.getWorkingDirectory()+"/"+ctx.getExitCodeFileName()); + is=tsi.getInputStream(ctx.getOutcomeDirectory()+ctx.getExitCodeFileName()); br=new BufferedReader(new InputStreamReader(is)); try{ String s=br.readLine(); @@ -341,7 +376,20 @@ public int getTotalNumberOfJobs() { return summary.total; } + + /** + * compute an appropriate grace period to ensure an interactice + * app is not assumed finished before some minimum runtime has + * passed. The value is computed by using a admin-controlled property + * + * @return + */ + public int computeGracePeriodForInteractiveApps(){ + int maxTimeMillis=1000*Integer.parseInt(configuration.getProperty(MAX_RUNTIME_FOR_INTERACTIVE_APPS, "3600")); + return maxTimeMillis/updateInterval; + } + /** * @see de.fzj.unicore.xnjs.simple.BasicExecution#pause(de.fzj.unicore.xnjs.ems.Action) */ @@ -400,6 +448,7 @@ } } + public static class BSSInfo{ String bssID; String jobID; Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java 2011-03-31 07:18:17 UTC (rev 9640) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java 2011-03-31 07:44:58 UTC (rev 9641) @@ -100,9 +100,8 @@ } /** - * make a legacy TSI "submit" command from a PosixApp, Action, and - * ExecutionContext<br/> (partly taken from XNJS 5 NJS's BatchTargetSystem - * code) + * make a legacy TSI "submit" command <br/> + * (partly taken from XNJS 5 NJS's BatchTargetSystem code) * * @param job - the job * @param idb - the IDB to use @@ -267,6 +266,42 @@ } /** + * generate an EXECUTE_SCRIPT command for running a command asynchronously + * + * @param command + * the command to execute + * @param executioncontext + * (may be null) + * @return a string for sending to the classic TSI + */ + public static String makeExecuteAsyncScript(Action job, Configuration config) { + + String template = config.getIDB().getExecuteTemplate(); + if (template == null) + template = DEFAULT_TEMPLATE; + + template = template.replace(TEMPLATE_COMMAND, "#TSI_EXECUTESCRIPT"); + ExecutionContext ec=job.getExecutionContext(); + ApplicationInfo ai=job.getApplicationInfo(); + ec.getEnvironment().putAll(ai.getEnvironment()); + StringBuilder commands = new StringBuilder(); + commands.append("#TSI_SCRIPT\n"); + commands.append("#TSI_DISCARD_OUTPUT true\n"); + + // add environment settings from context + if (ec != null) { + appendEnvironment(commands, ec); + } + commands.append(ai.getExecutable()); + + commands.append(" > ").append(ec.getOutcomeDirectory()).append(ec.getStdout()); + commands.append(" 2> ").append(ec.getOutcomeDirectory()).append(ec.getStderr()); + + commands.append("&& echo $? > ").append(ec.getOutcomeDirectory()).append("/") + .append(ec.getExitCodeFileName()).append(" &"); + return template.replace(TEMPLATE_SCRIPT, commands.toString()); + } + /** * parse the status listing returned by TSI GetStatusListing and modify the * map holding the states<br/> A {@link ContinueProcessingEvent} is * generated for an action, if Modified: xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/simple/BasicManager.java =================================================================== --- xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/simple/BasicManager.java 2011-03-31 07:18:17 UTC (rev 9640) +++ xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/simple/BasicManager.java 2011-03-31 07:44:58 UTC (rev 9641) @@ -65,6 +65,7 @@ import de.fzj.unicore.xnjs.ems.event.XnjsEvent; import de.fzj.unicore.xnjs.management.ManagedComponent; import de.fzj.unicore.xnjs.persistence.IActionStore; +import de.fzj.unicore.xnjs.tsi.ApplicationInfo; import de.fzj.unicore.xnjs.tsi.IExecutionSystemInformation; import de.fzj.unicore.xnjs.util.LogUtil; import eu.unicore.security.Client; @@ -542,6 +543,12 @@ soa.setAjd(jobDescription); soa.setProcessingContext(parentAction.getProcessingContext()); configuration.getExecutionContextMgr().createChildContext(parentAction,soa); + if(parentAction.getApplicationInfo()!=null){ + //copy environment + ApplicationInfo childAppInfo=new ApplicationInfo(); + childAppInfo.getEnvironment().putAll(parentAction.getApplicationInfo().getEnvironment()); + soa.setApplicationInfo(childAppInfo); + } addInternalAction(soa); return soa.getUUID(); } Modified: xnjs/trunk/xnjs-module-core/src/test/resources/ems/execution_environment_with_precommand.jsdl =================================================================== --- xnjs/trunk/xnjs-module-core/src/test/resources/ems/execution_environment_with_precommand.jsdl 2011-03-31 07:18:17 UTC (rev 9640) +++ xnjs/trunk/xnjs-module-core/src/test/resources/ems/execution_environment_with_precommand.jsdl 2011-03-31 07:44:58 UTC (rev 9641) @@ -8,7 +8,7 @@ </jsdl:Application> <ee:ExecutionEnvironment xmlns:ee="http://www.unicore.eu/unicore/jsdl-extensions"> <ee:Name>TIME</ee:Name> - <ee:UserPreCommand>/bin/date</ee:UserPreCommand> + <ee:UserPreCommand>/bin/sleep 10</ee:UserPreCommand> <ee:UserPostCommand>/bin/date</ee:UserPostCommand> </ee:ExecutionEnvironment> </jsdl:JobDescription> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |