From: <bsc...@us...> - 2011-03-31 07:55:03
|
Revision: 9643 http://unicore.svn.sourceforge.net/unicore/?rev=9643&view=rev Author: bschuller Date: 2011-03-31 07:54:54 +0000 (Thu, 31 Mar 2011) Log Message: ----------- merge stuff from trunk Modified Paths: -------------- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/pom.xml xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/XNJSConstants.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/Action.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ExecutionContext.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/InternalManager.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ProcessingContext.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/processors/DefaultProcessor.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/processors/UsageLogger.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/incarnation/IncarnationTweaker.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/GrounderImpl.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/IncarnatedExecutionEnvironment.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/Incarnation.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLParser.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLProcessor.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/legacy/Execution.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/legacy/TSIUtils.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/resources/ResourceSet.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/simple/BasicExecution.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/simple/BasicManager.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/simple/LocalECManager.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/simple/LocalExecution.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/simple/SimpleDataStagingProcessor.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/tsi/ApplicationInfo.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/tsi/IExecution.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/util/ExecuteScript.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/java/de/fzj/unicore/xnjs/ems/MultiJobTest.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/java/de/fzj/unicore/xnjs/ems/TestJSDLProcessing.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/java/de/fzj/unicore/xnjs/jsdl/TestJSDLResourceSet.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/java/de/fzj/unicore/xnjs/jsdl/TestUtils.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/java/de/fzj/unicore/xnjs/legacy/TestJobProcessingLegacyTSI.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/java/de/fzj/unicore/xnjs/simple/TestLocalTS.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/resources/ems/example.jsdl xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/resources/ems/simpleidb Added Paths: ----------- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ApplicationExecutionStatus.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/EMSStatistics.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/PrePostActionProcessor.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/simple/Statistics.java xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/test/resources/ems/execution_environment_with_precommand.jsdl Property Changed: ---------------- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/ Property changes on: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1 ___________________________________________________________________ Modified: svn:mergeinfo - /xnjs/trunk/xnjs-module-core:9228-9459 + /xnjs/trunk/xnjs-module-core:9228-9642 Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/pom.xml =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/pom.xml 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/pom.xml 2011-03-31 07:54:54 UTC (rev 9643) @@ -5,7 +5,7 @@ <artifactId>xnjs-module-core</artifactId> <packaging>jar</packaging> <name>XNJS Core</name> - <version>1.4.0-rc2</version> + <version>1.4.0-rc3</version> <parent> <groupId>eu.unicore</groupId> <artifactId>unicore-parent-endorsed</artifactId> @@ -33,7 +33,7 @@ <dependency> <groupId>de.fzj.unicore</groupId> <artifactId>jsdl-xmlbeans</artifactId> - <version>2.0.0</version> + <version>2.0.1</version> </dependency> <dependency> <groupId>eu.unicore.security</groupId> @@ -82,7 +82,7 @@ <dependency> <groupId>de.fzj.unicore.metrix</groupId> <artifactId>metrix</artifactId> - <version>0.4</version> + <version>0.5</version> </dependency> <dependency> <groupId>log4j</groupId> Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/XNJSConstants.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/XNJSConstants.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/XNJSConstants.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -44,5 +44,6 @@ public static final String jsdlActionType="JSDL"; public static final String jsdlStageInActionType="JSDL_STAGEIN"; public static final String jsdlStageOutActionType="JSDL_STAGEOUT"; + public static final String jsdlPrePostActionType="PRE_POST_PROCESSING"; } Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/Action.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/Action.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/Action.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -29,8 +29,8 @@ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. *********************************************************************************/ - + package de.fzj.unicore.xnjs.ems; import java.io.PrintWriter; @@ -42,6 +42,7 @@ import de.fzj.unicore.persist.annotations.ID; import de.fzj.unicore.persist.annotations.Table; import de.fzj.unicore.xnjs.jsdl.IncarnatedExecutionEnvironment; +import de.fzj.unicore.xnjs.tsi.ApplicationInfo; import eu.unicore.security.Client; /** @@ -53,19 +54,19 @@ @Table(name="JOBS") public class Action implements Serializable { private static final long serialVersionUID=984059803495L; - + //the unique id of the action private String UUID; //the type of action private String type; - + //optional the id assigned by the batch system private String BSID; - + //the time by which the action will no longer be valid private Date terminationTime; - + //the EMS status of the action private int status; @@ -74,28 +75,34 @@ //the owner of this action private Client client; - + + //human readable name + private String jobName; + //the description of the action as understood by the executing entity //TODO should probably use a AJD class implementing serializable... private Serializable ajd; - + //the original, "as submitted" version of the ajd private Serializable originalAjd=null; - + private ActionResult result; - + //a log trace private final List<String> log; - + //a context for use during processing: can use arbitrary objects objects private ProcessingContext processingContext; - + //the context during execution private ExecutionContext executionContext; - + //the execution environment private IncarnatedExecutionEnvironment executionEnvironment; - + + //the application information + private ApplicationInfo applicationInfo; + //if this is true the action has been modified and needs to be persisted private transient volatile boolean dirty=false; @@ -105,7 +112,7 @@ private transient volatile boolean waiting=false; public static final String AUTO_SUBMIT="EMS_AUTOSUBMIT"; - + //a default constructor for creating an uninitialised Action public Action(){ UUID=java.util.UUID.randomUUID().toString(); @@ -130,7 +137,7 @@ BSID = bsid; setDirty(); } - + /** * @return Returns the UUID. */ @@ -138,21 +145,21 @@ public String getUUID() { return UUID; } - + /** * @return Returns the status. */ public int getStatus() { return status; } - + /** * @return Returns the status. */ public String getStatusAsString() { return ActionStatus.toString(status); } - + /** * Set the action status. * This method is only to be invoked by the EMS itself, not in client code. @@ -162,14 +169,14 @@ this.status = status; setDirty(); } - + /** * @return Returns the terminationTime. */ public java.util.Date getTerminationTime() { return terminationTime!=null?(Date)terminationTime.clone():null; } - + /** * @param terminationTime The terminationTime to set. */ @@ -177,14 +184,14 @@ this.terminationTime = terminationTime!=null?(Date)terminationTime.clone():null; setDirty(); } - + /** * @return Returns the ajd. */ public Object getAjd() { return ajd; } - + /** * @param ajd The ajd to set. */ @@ -192,7 +199,7 @@ this.ajd = ajd; setDirty(); } - + /** * @param uuid The UUID to set. */ @@ -200,7 +207,7 @@ UUID = uuid; setDirty(); } - + /** * @return Returns the client. */ @@ -214,7 +221,7 @@ this.client = client; setDirty(); } - + /** * add a log entry to this action. A time stamp is added automatically */ @@ -222,7 +229,7 @@ log.add(new Date().toString()+": "+tr); setDirty(); } - + /** * print the full logtrace to stdout */ @@ -231,7 +238,7 @@ printLogTrace(pw); pw.flush(); } - + /** * print the full logtrace to a PrintWriter * @param writer PrintWriter @@ -251,7 +258,7 @@ List<String> l=a.getLog(); for(String s: l)addLogTrace(s); } - + /** * @return Returns the log. */ @@ -259,7 +266,7 @@ return log; } - + /** * @return Returns the type. */ @@ -321,7 +328,7 @@ public void setExecutionContext(ExecutionContext executionContext) { this.executionContext = executionContext; } - + /** * @return Returns the executionEnvironment */ @@ -335,7 +342,15 @@ public void setExecutionEnvironment(IncarnatedExecutionEnvironment executionEnvironment) { this.executionEnvironment=executionEnvironment; } - + + public ApplicationInfo getApplicationInfo() { + return applicationInfo; + } + + public void setApplicationInfo(ApplicationInfo applicationInfo) { + this.applicationInfo = applicationInfo; + } + /** * @return Returns the transitionalStatus. */ @@ -363,15 +378,32 @@ */ public void setOriginalAjd(Serializable originalAjd) { if(this.originalAjd!=null)throw new - RuntimeException("Tried setting original job description more than once."); + RuntimeException("Tried setting original job description more than once."); else this.originalAjd = originalAjd; setDirty(); } - + + /** + * get the human readable job name + * @return + */ + public String getJobName() { + return jobName; + } + + /** + * set the (human-readable) job name + * @param jobName - the job name to set + */ + public void setJobName(String jobName) { + this.jobName = jobName; + setDirty(); + } + public void setDirty(){dirty=true;} public boolean isDirty(){return dirty;} public void clearDirty(){dirty=false;} - + public boolean isWaiting(){return waiting;} /** @@ -380,7 +412,7 @@ public void setWaiting(boolean waiting){ this.waiting=waiting; } - + /** * helper method that sets the action to "FAILED" */ @@ -395,7 +427,7 @@ setStatus(ActionStatus.DONE); addLogTrace("Status set to DONE."); } - + public String toString(){ StringBuilder sb=new StringBuilder(); sb.append("Action ID : ").append(getUUID()).append("\n"); @@ -410,5 +442,5 @@ sb.append("Orig. Definition: ").append(getOriginalAjd()).append("\n"); return sb.toString(); } - + } Copied: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ApplicationExecutionStatus.java (from rev 9629, xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/ApplicationExecutionStatus.java) =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ApplicationExecutionStatus.java (rev 0) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ApplicationExecutionStatus.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -0,0 +1,124 @@ +/********************************************************************************* + * Copyright (c) 2011 Forschungszentrum Juelich GmbH + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * (1) Redistributions of source code must retain the above copyright notice, + * this list of conditions and the disclaimer at the end. Redistributions in + * binary form must reproduce the above copyright notice, this list of + * conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * DISCLAIMER + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + *********************************************************************************/ + + +package de.fzj.unicore.xnjs.ems; + +import java.io.Serializable; + +/** + * sub-status codes for tracking the execution of the actual application + * including pre- and post-commands + * + * @author schuller + */ +public class ApplicationExecutionStatus implements Serializable{ + + private static final long serialVersionUID = 1L; + + private int status; + + /** + * create a new application execution status in state CREATED + */ + public ApplicationExecutionStatus(){ + this.status=CREATED; + } + + public ApplicationExecutionStatus(int status){ + this.status=status; + } + + public int get(){ + return status; + } + + public void set(int status){ + this.status=status; + } + + //?? + public static final int UNKNOWN=-1; + + //just created + public static final int CREATED=0; + + //precommand(s) are executing + public static final int PRECOMMAND_EXECUTION=1; + + public static final int PRECOMMAND_DONE=2; + + //main thing is running + public static final int MAIN_EXECUTION=50; + //main thing is running + public static final int MAIN_EXECUTION_DONE=51; + + //post-command(s) are executing + public static final int POSTCOMMAND_EXECUTION=70; + + //finished execution + public static final int DONE=99; + + public String toString(){ + return asString(status); + } + + public static String asString(int s){ + switch(s){ + case UNKNOWN: return "UNKNOWN"; + case CREATED: return "CREATED"; + case PRECOMMAND_EXECUTION: return "PRECOMMAND_EXECUTION"; + case PRECOMMAND_DONE: return "PRECOMMAND_DONE"; + case MAIN_EXECUTION: return "MAIN_EXECUTION"; + case MAIN_EXECUTION_DONE: return "MAIN_EXECUTION_DONE"; + case POSTCOMMAND_EXECUTION: return "POSTCOMMAND_EXECUTION"; + case DONE: return "DONE"; + } + return null; + } + + public static ApplicationExecutionStatus done(){ + return new ApplicationExecutionStatus(DONE); + } + public static ApplicationExecutionStatus precommandDone(){ + return new ApplicationExecutionStatus(PRECOMMAND_DONE); + } + public static ApplicationExecutionStatus precommandRunning(){ + return new ApplicationExecutionStatus(PRECOMMAND_EXECUTION); + } + public static ApplicationExecutionStatus mainExecutionDone(){ + return new ApplicationExecutionStatus(MAIN_EXECUTION_DONE); + } + public static ApplicationExecutionStatus postcommandRunning(){ + return new ApplicationExecutionStatus(POSTCOMMAND_EXECUTION); + } +} Copied: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/EMSStatistics.java (from rev 9629, xnjs/trunk/xnjs-module-core/src/main/java/de/fzj/unicore/xnjs/ems/EMSStatistics.java) =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/EMSStatistics.java (rev 0) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/EMSStatistics.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -0,0 +1,22 @@ +package de.fzj.unicore.xnjs.ems; + +import de.fzj.unicore.xnjs.management.DefaultImplementation; +import de.fzj.unicore.xnjs.simple.Statistics; + +@DefaultImplementation(clazz=Statistics.class) + +public interface EMSStatistics { + + /** + * get the mean time in millis that actions are in the "Queued" state + * @return mean "queued" time in millis + */ + public long getMeanTimeQueued(); + + /** + * store a new measured value + * @param value - the measured queued time + */ + public void updateMeanTimeQueued(long value); + +} Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ExecutionContext.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ExecutionContext.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ExecutionContext.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -36,6 +36,7 @@ import java.io.Serializable; import java.util.HashMap; +import de.fzj.unicore.xnjs.legacy.TSIUtils; import de.fzj.unicore.xnjs.resources.ResourceSet; /** @@ -51,6 +52,8 @@ private String actionID; //the working dir private String workingDirectory; + //the outcome dir (by default equal to the working dir) + private String outcomeDirectory; //environment variables private HashMap<String,String> environment; //some id where this job is running (node, ip, url, whatever) @@ -76,6 +79,8 @@ //the resources requested for executing the job private ResourceSet resourceRequest; + private String exitCodeFileName=TSIUtils.EXITCODE_FILENAME; + //create an uninitialized ExecutionContext public ExecutionContext(){ environment=new HashMap<String,String>(); @@ -121,6 +126,20 @@ } /** + * @return Returns the outcome directory (which is guaranteed to end with the target system's file separator) + */ + public String getOutcomeDirectory() { + return outcomeDirectory!=null? outcomeDirectory : workingDirectory; + } + + /** + * @param outcomeDirectory The outcome directory to set + */ + public void setOutcomeDirectory(String outcomeDirectory) { + this.outcomeDirectory = outcomeDirectory; + } + + /** * @return Returns the stderr. */ public String getStderr() { @@ -182,6 +201,14 @@ this.actionID = actionID; } + public String getExitCodeFileName() { + return exitCodeFileName; + } + + public void setExitCodeFileName(String exitCodeFileName) { + this.exitCodeFileName = exitCodeFileName; + } + public boolean isInteractive() { return isInteractive; } Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/InternalManager.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/InternalManager.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/InternalManager.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -34,8 +34,10 @@ package de.fzj.unicore.xnjs.ems; import java.io.Serializable; +import java.util.concurrent.TimeUnit; import de.fzj.unicore.xnjs.ems.event.EventHandler; +import de.fzj.unicore.xnjs.ems.event.XnjsEvent; /** @@ -99,6 +101,7 @@ * construct a sub action and start processing it * * @param jobDescription - the job description for the subaction + * @param type - the sub action type * @param parentAction - the parent action * @param notifyDone - whether to notify the parent when the subaction is done * @return the unique id of the new action @@ -142,4 +145,13 @@ * resume processing */ public void resumeProcessing(); + + /** + * process the given event after the defined delay + * + * @param event - the event to process + * @param delay - delay + * @param unit - delay units + */ + public void scheduleEvent(final XnjsEvent event, int delay, TimeUnit unit); } Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ProcessingContext.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ProcessingContext.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/ProcessingContext.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -33,4 +33,14 @@ } + /** + * set, using the value's class as key + * @param <T> + * @param value + * @return + */ + public <T> void set(T value){ + Class<?>key=value.getClass(); + put(key,value); + } } Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/processors/DefaultProcessor.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/processors/DefaultProcessor.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/processors/DefaultProcessor.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -33,6 +33,8 @@ package de.fzj.unicore.xnjs.ems.processors; +import java.util.concurrent.TimeUnit; + import org.apache.log4j.Logger; import de.fzj.unicore.xnjs.Configuration; @@ -41,6 +43,7 @@ import de.fzj.unicore.xnjs.ems.ActionStatus; import de.fzj.unicore.xnjs.ems.ProcessingException; import de.fzj.unicore.xnjs.ems.Processor; +import de.fzj.unicore.xnjs.ems.event.ContinueProcessingEvent; import de.fzj.unicore.xnjs.util.LogUtil; /** @@ -74,6 +77,10 @@ action.setStatus(ActionStatus.RUNNING); } + /** + * handle the "removing" state.<br/> + * This default implementation cleans up the working directory + */ protected void handleRemoving()throws ProcessingException{ try{ configuration.getExecutionContextMgr().destroyContext(action); @@ -125,6 +132,27 @@ } /** + * utility method which sets the action status to DONE and the result + * to NOT_SUCCESSFUL with the reason given + * @param reason + */ + protected void setToDoneAndFailed(String reason){ + action.setStatus(ActionStatus.DONE); + action.setResult(new ActionResult(ActionResult.NOT_SUCCESSFUL,reason)); + } + + + /** + * send the action to sleep for the specified time in millis, by + * setting the "waiting" flag and scheduling a "continue" event + * @param millis + */ + protected void sleep(int millis){ + action.setWaiting(true); + manager.scheduleEvent(new ContinueProcessingEvent(action.getUUID()), millis, TimeUnit.MILLISECONDS); + } + + /** * allow to set action for unit testing * @param a - the action */ Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/processors/UsageLogger.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/processors/UsageLogger.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/ems/processors/UsageLogger.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -49,6 +49,8 @@ * standard logfile) * * @author schuller + * @author ml054 + * @author golbi */ public class UsageLogger extends DefaultProcessor { @@ -56,6 +58,8 @@ private static final Logger logger=LogUtil.getLogger(LogUtil.JOBS,UsageLogger.class); static final String USAGE_LOGGED="USAGE.logged"; + public final static String XNJS_TSI_MACHINE_NAME_PROP = "CLASSICTSI.machine"; + public final static String REAL_BSS_MACHINE_NAME_PROP = "RUS.bssMachine"; public UsageLogger(Configuration config){ super(config); @@ -91,11 +95,58 @@ exec=action.getExecutionContext().getExecutable(); } else exec=""; + + String bsid = action.getBSID(); + String localUserName = action.getClient().getUserName(); + String jobName = action.getJobName(); + + String machineName = configuration.getProperty(REAL_BSS_MACHINE_NAME_PROP); + if (machineName == null) + machineName = configuration.getProperty(XNJS_TSI_MACHINE_NAME_PROP); + + String vo = null; + if (action.getClient() != null && action.getClient().getVos() != null && + action.getClient().getVos().length > 0) { + String[] vos = action.getClient().getVos(); + StringBuilder vosb = new StringBuilder(); + int i=0; + for (; i<vos.length-1; i++) { + vosb.append(vos[i]); + vosb.append(","); + } + if (i<vos.length) + vosb.append(vos[i]); + } + + // set <null> when value if not defined + if (bsid == null) { + bsid = "<null>"; + } + if (localUserName == null) { + localUserName = "<null>"; + } + if (jobName == null) { + jobName = "<null>"; + } + if (machineName == null) { + machineName = "<null>"; + } + if (vo == null) { + vo = "<null>"; + } + + sb.append("[").append(result).append("] [ "); sb.append(exec).append("] [ "); sb.append(uid).append("] ["); - sb.append(client).append("]"); - + sb.append(client).append("] ["); + sb.append(bsid); + sb.append("] [").append(localUserName); + sb.append("] [").append(jobName); + sb.append("] [").append(machineName); + sb.append("] [").append(vo); + sb.append("]"); + return sb.toString(); } Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/incarnation/IncarnationTweaker.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/incarnation/IncarnationTweaker.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/incarnation/IncarnationTweaker.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -16,6 +16,7 @@ import de.fzj.unicore.xnjs.Configuration; import de.fzj.unicore.xnjs.ems.Action; import de.fzj.unicore.xnjs.jsdl.IncarnationDataBase; +import de.fzj.unicore.xnjs.management.Lifecycle; import de.fzj.unicore.xnjs.tsi.ApplicationInfo; import de.fzj.unicore.xnjs.util.LogUtil; @@ -25,6 +26,7 @@ * @author golbi * */ +@Lifecycle(isSingleton=true) public class IncarnationTweaker implements ITweaker { public static final String INCARNATION_TWEAKER_CONFIG_KEY = "XNJS.incarnationTweakerConfig"; private static final Logger log = LogUtil.getLogger(LogUtil.XNJS, IncarnationTweaker.class); Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/GrounderImpl.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/GrounderImpl.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/GrounderImpl.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -100,6 +100,8 @@ ApplicationInfo incarnated=null; ApplicationInfo result=new ApplicationInfo(); ApplicationType app=jd.getJobDefinition().getJobDescription().getApplication(); + if(app==null)return null; + String name=app.getApplicationName(); String version=app.getApplicationVersion(); @@ -249,7 +251,7 @@ ResourcesType rt=jd.getJobDefinition().getJobDescription().getResources(); ResourceSet incarnatedResources=incarnateResources(rt, client); if(incarnatedResources!=null){ - job.addLogTrace("Incarnated resources: "+incarnatedResources); + job.addLogTrace("Incarnated resources: "+incarnatedResources.printSelected()); } return incarnatedResources; } Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/IncarnatedExecutionEnvironment.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/IncarnatedExecutionEnvironment.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/IncarnatedExecutionEnvironment.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -27,6 +27,10 @@ private String executable; private String userCommand; private String userCommandArguments=""; + private String userPreCommand; + private boolean runUserPreCommandOnLoginNode; + private String userPostCommand; + private boolean runUserPostCommandOnLoginNode; private String redirectInput; private String commandlineTemplate=DEFAULT_CMDLINE; @@ -60,10 +64,18 @@ return options; } + /** + * get the main executable (e.g. "mpirun") + * @return + */ public String getExecutable() { return executable; } + /** + * get the user's command + * @return + */ public String getUserCommand() { return userCommand; } @@ -96,6 +108,39 @@ arguments.put(name,value); } + public void setUserPreCommand(String cmd){ + this.userPreCommand=cmd; + } + + public String getUserPreCommand(){ + return userPreCommand; + } + + public void setUserPostCommand(String cmd){ + this.userPostCommand=cmd; + } + + public String getUserPostCommand(){ + return userPostCommand; + } + + public boolean isRunUserPreCommandOnLoginNode() { + return runUserPreCommandOnLoginNode; + } + + public void setRunUserPreCommandOnLoginNode(boolean runUserPreCommandOnLoginNode) { + this.runUserPreCommandOnLoginNode = runUserPreCommandOnLoginNode; + } + + public boolean isRunUserPostCommandOnLoginNode() { + return runUserPostCommandOnLoginNode; + } + + public void setRunUserPostCommandOnLoginNode( + boolean runUserPostCommandOnLoginNode) { + this.runUserPostCommandOnLoginNode = runUserPostCommandOnLoginNode; + } + public void setCommandlineTemplate(String commandlineTemplate) { this.commandlineTemplate = commandlineTemplate; } @@ -167,11 +212,17 @@ for(String preCommand: preCommands){ target.append(preCommand).append("\n"); } + if(userPreCommand!=null && !isRunUserPreCommandOnLoginNode()){ + target.append(userPreCommand).append("\n"); + } doIncarnate(target); target.append("\n"); for(String postCommand: postCommands){ target.append(postCommand).append("\n"); } + if(userPostCommand!=null && !isRunUserPostCommandOnLoginNode()){ + target.append(userPostCommand).append("\n"); + } } public static IncarnatedExecutionEnvironment getDefault(){ Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/Incarnation.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/Incarnation.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/Incarnation.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -90,7 +90,7 @@ * * @param app * @param client - * @return POSIXApplication + * @return ApplicationInfo or <code>null</code> if no app info given in the job */ public ApplicationInfo incarnateApplication(Action job, Client client) throws ExecutionException; Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLParser.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLParser.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLParser.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -161,23 +161,26 @@ env.addPreCommand(cmd.getIncarnatedValue()); } } - String userPre=execEnvRequest.getUserPreCommand(); - if(userPre!=null){ - env.addPreCommand(userPre); + //user defined pre-command + if(execEnvRequest.getUserPreCommand()!=null){ + String userPre=execEnvRequest.getUserPreCommand().getStringValue(); + boolean login=execEnvRequest.getUserPreCommand().getRunOnLoginNode(); + env.setUserPreCommand(userPre); + env.setRunUserPreCommandOnLoginNode(login); } - //post-commands for(OptionType cmd: definedExecEnv.getPostCommandArray()){ if(isOptionDefined(cmd.getName(),execEnvRequest.getPostCommandArray())){ env.addPostCommand(cmd.getIncarnatedValue()); } } - - String userPost=execEnvRequest.getUserPostCommand(); - if(userPost!=null){ - env.addPostCommand(userPost); + //user defined post command + if(execEnvRequest.getUserPostCommand()!=null){ + String userPost=execEnvRequest.getUserPostCommand().getStringValue(); + boolean login=execEnvRequest.getUserPostCommand().getRunOnLoginNode(); + env.setUserPostCommand(userPost); + env.setRunUserPostCommandOnLoginNode(login); } - //allow user to override values if(execEnvRequest.getExecutableName()!=null){ env.setExecutable(execEnvRequest.getExecutableName()); Modified: xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLProcessor.java =================================================================== --- xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLProcessor.java 2011-03-31 07:46:44 UTC (rev 9642) +++ xnjs/tags/xnjs-module-core/xnjs-module-core-1.4.0-rc1/src/main/java/de/fzj/unicore/xnjs/jsdl/JSDLProcessor.java 2011-03-31 07:54:54 UTC (rev 9643) @@ -29,62 +29,86 @@ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. *********************************************************************************/ - + package de.fzj.unicore.xnjs.jsdl; import java.io.Serializable; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; +import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationDocument; +import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationType; import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument; import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDescriptionType; import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobIdentificationType; +import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.POSIXApplicationDocument; import de.fzj.unicore.xnjs.Configuration; import de.fzj.unicore.xnjs.XNJSConstants; import de.fzj.unicore.xnjs.ems.Action; import de.fzj.unicore.xnjs.ems.ActionResult; import de.fzj.unicore.xnjs.ems.ActionStatus; +import de.fzj.unicore.xnjs.ems.ApplicationExecutionStatus; +import de.fzj.unicore.xnjs.ems.EMSStatistics; +import de.fzj.unicore.xnjs.ems.ExecutionContext; import de.fzj.unicore.xnjs.ems.ExecutionException; import de.fzj.unicore.xnjs.ems.IExecutionContextManager; import de.fzj.unicore.xnjs.ems.ProcessingException; +import de.fzj.unicore.xnjs.ems.event.ContinueProcessingEvent; +import de.fzj.unicore.xnjs.ems.event.XnjsEvent; import de.fzj.unicore.xnjs.ems.processors.DefaultProcessor; +import de.fzj.unicore.xnjs.jsdl.PrePostActionProcessor.SubCommand; +import de.fzj.unicore.xnjs.resources.ResourceSet; +import de.fzj.unicore.xnjs.simple.BasicExecution; +import de.fzj.unicore.xnjs.tsi.ApplicationInfo; import de.fzj.unicore.xnjs.tsi.IExecution; import de.fzj.unicore.xnjs.tsi.TSI; import de.fzj.unicore.xnjs.tsi.TSIBusyException; import de.fzj.unicore.xnjs.util.LogUtil; +import de.fzj.unicore.xnjs.util.XmlBeansUtils; +import eu.unicore.security.Client; /** * processor for JSDL actions * + * TODO refactor into a generic stage-in/execute/stage-out thing that will work for + * other job description variants as well + * * @author schuller */ public class JSDLProcessor extends DefaultProcessor { - + private static final Logger logger=LogUtil.getLogger(LogUtil.JOBS,JSDLProcessor.class); + + private static final String subactionkey_in="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_SUBACTION_STAGEIN"; + private static final String KEY_DELETEONTERMINATION="JSDL_DELETEFILES"; + private static final String subactionkey_out="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_SUBACTION_STAGEOUT"; + private static final String subactionkey_pre="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_SUBACTION_PRECOMMAND"; + private static final String subactionkey_post="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_SUBACTION_POSTCOMMAND"; + + //time stamp names + private static final String STARTTIME="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_START"; + private static final String ENDSTAGEIN="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_ENDSTAGEIN"; + private static final String SUBMITTED="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_SUBMITTED"; + private static final String START_RUNNING="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_START_RUNNING"; + private static final String STARTSTAGEOUT="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_STARTSTAGEOUT"; - public static final String subactionkey_in="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_SUBACTION_STAGEIN"; - public static final String KEY_DELETEONTERMINATION="JSDL_DELETEFILES"; - - public static final String subactionkey_out="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_SUBACTION_STAGEOUT"; - - public static final String STARTTIME="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_START"; - public static final String ENDSTAGEIN="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_ENDSTAGEIN"; - public static final String STARTSTAGEOUT="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_STARTSTAGEOUT"; - public static final String ENDTIME="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_END"; - + private static final String ENDTIME="JSDL_de.fzj.unicore.xnjs.jsdl.JSDLProcessor_END"; + /** * the execution interface */ - protected IExecution exec; - + protected final IExecution exec; + /** * the execution context manager */ - protected IExecutionContextManager ecm; - + protected final IExecutionContextManager ecm; + /** * default constructor */ @@ -93,7 +117,7 @@ exec=configuration.getExecutionInterface(); ecm=configuration.getExecutionContextMgr(); } - + protected void begin() throws ProcessingException { super.begin(); } @@ -122,6 +146,12 @@ } catch (NullPointerException e) { } } storeTimeStamp(STARTTIME); + try{ + extractFromJSDL(); + } + catch(ExecutionException ee){ + throw new ProcessingException(ee); + } if(JSDLUtils.hasStageIn(jdd)){ action.setStatus(ActionStatus.PREPROCESSING); action.addLogTrace("Status set to PREPROCESSING (staging in)."); @@ -142,68 +172,55 @@ action.addLogTrace("No staging in needed."); setToReady(); storeTimeStamp(ENDSTAGEIN); - + } - //processing problems? + //processing problems? }catch(Exception ex){ String msg="Error processing action "+ex.getMessage(); action.addLogTrace(msg); throw new ProcessingException(msg,ex); } - + } /** * handle state "PreProcessing" aka "Staging in" * @param a Action */ - @SuppressWarnings("unchecked") protected void handlePreProcessing() throws ProcessingException { try{ - //get id of sub action - String id=(String)action.getProcessingContext().get(subactionkey_in); - if(id==null){//no subaction - setToReady(); - return; + //get id of stage in sub action + String stageInActionID=(String)action.getProcessingContext().get(subactionkey_in); + if(stageInActionID!=null){ + handleStagingIn(stageInActionID); } - //check subaction status - Action subAction=null; - if(manager.isActionDone(id)){ - subAction=manager.getAction(id); - ActionResult r=subAction.getResult(); - action.addLogTrace("Stage-in log:"); - action.appendLogTraceFrom(subAction); - action.addLogTrace("Stage-in is DONE."); - if(r.isSuccessful()){ - action.addLogTrace("Staging in was SUCCESSFUL"); - setToReady(); - storeTimeStamp(ENDSTAGEIN); - //get thr list of files to delete after the job is done - //(to honor the JSDL DeleteOnTermination) - List<String> files=(List<String>)subAction.getProcessingContext().get(KEY_DELETEONTERMINATION); - action.getProcessingContext().put(KEY_DELETEONTERMINATION,files); - } - else{ - action.addLogTrace("Staging in was NOT SUCCESSFUL"); - StringBuilder errorDescription=new StringBuilder("Staging in was NOT SUCCESSFUL"); - if(r.getErrorMessage()!=null){ - errorDescription.append(": "+r.getErrorMessage()); - } - //TODO: policy on this or not? - setToDoneAndFailed(errorDescription.toString()); - } - configuration.getEMSManager().destroy(id, action.getClient()); - } }catch(Exception ex){ LogUtil.logException("Error processing stage-in.",ex); setToDoneAndFailed("Error processing stage-in."); } } + + @SuppressWarnings("unchecked") + protected void handleStagingIn(String id)throws ExecutionException, ProcessingException{ + String err=checkSubAction(subactionkey_in, "Stage in"); + if(err!=null){ + //TODO: policy on this or not? + setToDoneAndFailed(err); + }else if(!action.isWaiting()){ + Action subAction=manager.getAction(id); + storeTimeStamp(ENDSTAGEIN); + List<String> files=(List<String>)subAction.getProcessingContext().get(KEY_DELETEONTERMINATION); + action.getProcessingContext().put(KEY_DELETEONTERMINATION,files); + configuration.getEMSManager().destroy(id, action.getClient()); + setToReady(); + } + + } private void setToReady(){ action.setStatus(ActionStatus.READY); action.addLogTrace("Status set to READY."); } - + /** * handle "READY" state * There are two cases: @@ -227,41 +244,182 @@ goToPending(); } } - + } - + private void goToPending(){ action.setStatus(ActionStatus.PENDING); action.addLogTrace("Status set to PENDING."); return; } + /** * handle "PENDING" state * @param a Action */ protected void handlePending() throws ProcessingException{ - if(logger.isTraceEnabled())logger.trace("Handling PENDING state for Action " - +action.getUUID()); + if(logger.isTraceEnabled())logger.trace("Handling PENDING state for Action "+action.getUUID()); + + ApplicationExecutionStatus aes=action.getProcessingContext().get(ApplicationExecutionStatus.class); + if(aes==null){ + aes=new ApplicationExecutionStatus(); + action.getProcessingContext().set(aes); + } + + switch(aes.get()){ + case ApplicationExecutionStatus.CREATED: + setupPreCommand(); + break; + + case ApplicationExecutionStatus.PRECOMMAND_EXECUTION: + handlePreCommandRunning(); + break; + + case ApplicationExecutionStatus.PRECOMMAND_DONE: + submitMainExecutable(); + break; + + default: + throw new ProcessingException("Illegal state PENDING."); + } + } + + + + /** + * initialise pre-command execution + * @throws ProcessingException + */ + protected void setupPreCommand() throws ProcessingException{ try{ //first check if action has an application element - JobDefinitionDocument ajd=(JobDefinitionDocument)action.getAjd(); - - if(ajd.getJobDefinition().getJobDescription()==null){ - String msg="Empty job description. Setting to DONE."; - action.addLogTrace(msg); - setToDoneSuccessfully(); - return; + IncarnatedExecutionEnvironment ee=action.getExecutionEnvironment(); + if(ee==null || ee.getUserPreCommand()==null || !ee.isRunUserPreCommandOnLoginNode()){ + action.addLogTrace("No pre-command to execute"); + action.getProcessingContext().set(ApplicationExecutionStatus.precommandDone()); } - if(ajd.getJobDefinition().getJobDescription().getApplication()!=null) - { - action.setWaiting(true); - action.setStatus(ActionStatus.QUEUED); - exec.submit(action); + else{ + assurePrePostProcessingAvailable(); + String userPre=ee.getUserPreCommand(); + SubCommand cmd=new SubCommand(); + cmd.type="PRE"; + cmd.cmd=userPre; + String subID=createPrePostAction(cmd); + action.getProcessingContext().put(subactionkey_pre, subID); + action.getProcessingContext().set(ApplicationExecutionStatus.precommandRunning()); } + }catch(Exception ex){ + String msg="Could not setup pre-command: "+ex.getMessage(); + action.addLogTrace(msg); + setToDoneAndFailed(msg); + throw new ProcessingException(msg,ex); + } + } + + + protected void handlePreCommandRunning()throws ProcessingException{ + try{ + String id=(String)action.getProcessingContext().get(subactionkey_pre); + String err=checkSubAction(subactionkey_pre, "Pre-commands"); + if(err!=null){ + setToDoneAndFailed(err); + configuration.getEMSManager().destroy(id, action.getClient()); + } + else if(!action.isWaiting()){ + action.getProcessingContext().set(ApplicationExecutionStatus.precommandDone()); + configuration.getEMSManager().destroy(id, action.getClient()); + } + } + catch(ExecutionException ee){ + throw new ProcessingException(ee); + } + } + + + /** + * initialise post-command execution + * @throws ProcessingException + */ + protected void setupPostCommand() throws ProcessingException{ + try{ + IncarnatedExecutionEnvironment ee=action.getExecutionEnvironment(); + if(ee==null || ee.getUserPostCommand()==null || !ee.isRunUserPostCommandOnLoginNode()){ + action.addLogTrace("No post-command to execute"); + action.getProcessingContext().set(ApplicationExecutionStatus.done()); + } else{ + assurePrePostProcessingAvailable(); + String userPost=ee.getUserPostCommand(); + SubCommand cmd=new SubCommand(); + cmd.type="POST"; + cmd.cmd=userPost; + String subID=createPrePostAction(cmd); + action.getProcessingContext().put(subactionkey_post, subID); + action.getProcessingContext().set(ApplicationExecutionStatus.postcommandRunning()); + } + }catch(Exception ex){ + String msg="Could not setup post-command: "+ex.getMessage(); + action.addLogTrace(msg); + setToDoneAndFailed(msg); + throw new ProcessingException(msg,ex); + } + } + + protected void handlePostCommandRunning()throws ProcessingException{ + try{ + String id=(String)action.getProcessingContext().get(subactionkey_post); + String err=checkSubAction(subactionkey_post, "Post-commands"); + if(err!=null){ + setToDoneAndFailed(err); + configuration.getEMSManager().destroy(id, action.getClient()); + } + else if(!action.isWaiting()){ + action.getProcessingContext().set(ApplicationExecutionStatus.done()); + configuration.getEMSManager().destroy(id, action.getClient()); + } + } + catch(ExecutionException ee){ + throw new ProcessingException(ee); + } + + } + + private void assurePrePostProcessingAvailable(){ + if(!configuration.haveProcessingFor(XNJSConstants.jsdlPrePostActionType)){ + configuration.setProcessingChain(XNJSConstants.jsdlPrePostActionType, null, + new String[]{PrePostActionProcessor.class.getName()}); + } + } + + /** + * submits the given command array as a sub action + * + * @param cmds + * @return the ID of the sub action + * @throws ExecutionException + */ + protected String createPrePostAction(SubCommand cmd)throws ExecutionException{ + return manager.addSubAction(cmd, XNJSConstants.jsdlPrePostActionType, action, true); + } + + /** + * submit the main application + * @throws ProcessingException + */ + protected void submitMainExecutable() throws ProcessingException{ + try{ + ApplicationInfo appInfo=action.getApplicationInfo(); + if(appInfo==null){ action.addLogTrace("No application to execute, changing action status to POSTPROCESSING"); action.setStatus(ActionStatus.POSTPROCESSING); + return; } + action.setWaiting(true); + exec.submit(action); + ApplicationExecutionStatus aes=action.getProcessingContext().get(ApplicationExecutionStatus.class); + aes.set(ApplicationExecutionStatus.MAIN_EXECUTION); + action.setStatus(ActionStatus.QUEUED); + storeTimeStamp(SUBMITTED); }catch(ExecutionException ex){ String msg="Could not submit job: "+ex.getMessage(); action.addLogTrace(msg); @@ -299,6 +457,9 @@ */ protected void handleRunning()throws ProcessingException{ if(logger.isTraceEnabled())logger.trace("Handling RUNNING state for Action "+action.getUUID()); + if(action.getProcessingContext().get(START_RUNNING)==null){ + storeTimeStamp(START_RUNNING); + } try{ exec.updateStatus(action); if(action.getStatus()==ActionStatus.RUNNING){ @@ -311,12 +472,36 @@ } } + @Override + protected void handlePostProcessing() throws ProcessingException{ + ApplicationExecutionStatus aes=action.getProcessingContext().get(ApplicationExecutionStatus.class); + + switch(aes.get()){ + case ApplicationExecutionStatus.MAIN_EXECUTION: + case ApplicationExecutionStatus.MAIN_EXECUTION_DONE: + setupPostCommand(); + break; + + case ApplicationExecutionStatus.POSTCOMMAND_EXECUTION: + handlePostCommandRunning(); + break; + + case ApplicationExecutionStatus.DONE: + handleStageOut(); + break; + + default: + throw new ProcessingException("Illegal state: POSTPROCESSING with substate "+aes); + } + + } + + /** - * handle "STAGING_OUT" state - * @param a Action + * handle "STAGING_OUT" */ - protected void handlePostProcessing() throws ProcessingException{ + protected void handleStageOut() throws ProcessingException{ //get id of sub action String id=(String)action.getProcessingContext().get(subactionkey_out); if(id==null){ @@ -324,45 +509,194 @@ addStageOut(); } else{ - //check stage out status - try{ - if(manager.isActionDone(id)){ - Action subAction=manager.getAction(id); - action.addLogTrace("Stage-out log:"); - action.appendLogTraceFrom(subAction); - action.addLogTrace("Stage-out is DONE."); - ActionResult r=subAction.getResult(); - if(!r.isSuccessful()){ - action.addLogTrace("Staging out was NOT SUCCESSFUL"); - StringBuilder errorDescription=new StringBuilder("Staging out was NOT SUCCESSFUL"); - if(r.getErrorMessage()!=null){ - errorDescription.append(": "+r.getErrorMessage()); - } - //TODO: policy on this or not? - setToDoneAndFailed(errorDescription.toString()); + String err=checkSubAction(subactionkey_out, "Stage-out"); + if(err==null){ + setToDoneSuccessfully(); + }else if(!action.isWaiting()){ + setToDoneAndFailed(err); + } + } + } + + /** + * + * @param processingKey - the processing context key of the sub action + * @param name - the name (pre-commands,post-commands,etc) + * @return <code>null</code> if all OK, an error message otherwise + */ + protected String checkSubAction(String processingKey, String name)throws ProcessingException{ + //get id of sub action + String id=(String)action.getProcessingContext().get(processingKey); + try{ + if(manager.isActionDone(id)){ + Action subAction=manager.getAction(id); + action.addLogTrace(name+" log:"); + action.appendLogTraceFrom(subAction); + action.addLogTrace(name+" is DONE."); + ActionResult r=subAction.getResult(); + if(!r.isSuccessful()){ + action.addLogTrace(name+" was NOT SUCCESSFUL"); + StringBuilder errorDescription=new StringBuilder(name+" was NOT SUCCESSFUL"); + if(r.getErrorMessage()!=null){ + errorDescription.append(": "+r.getErrorMessage()); } - else{ - setToDoneSuccessfully(); - } + return errorDescription.toString(); } - }catch(Exception ex){ - throw new ProcessingException(ex); } + else{ + //still running, check again later + sleep(3000); + } + }catch(Exception ex){ + throw new ProcessingException("Can't check subaction for "+name,ex); } + return null; } - + + /** + * this method extracts JSDL specific info like {@link ApplicationInfo} from + * the job description and fills the proper fields in the current action + * + * @throws Exception + */ + protected void extractFromJSDL()throws ExecutionException{ + Incarnation grounder=configuration.getGrounder(); + Client client=action.getClient(); + ecm.getContext(action); + try{ + //do an incarnation now... + ApplicationInfo pa=grounder.incarnateApplication(action,client); + if(pa!=null && pa.getExecutable()==null) { + throw new ExecutionException("Application could not be mapped to an executable."); + } + action.setApplicationInfo(pa); + IncarnatedExecutionEnvironment iee=null; + iee=grounder.incarnateExecutionEnvironment(action, client); + if(iee!=null){ + action.addLogTrace("Incarnated execution environment <"+iee.getName()+"> with executable '"+iee.getExecutable()+"'"); + }else{ + iee=IncarnatedExecutionEnvironment.getDefault(); + action.addLogTrace("Using default execution environment."); + } + action.setExecutionEnvironment(iee); + + JobDefinitionDocument jd=(JobDefinitionDocument)((JobDefinitionDocument)action.getAjd()); + + //job name + JobIdentificationType ji=jd.getJobDefinition().getJobDescription().getJobIdentification(); + if(ji!=null){ + String jobName=ji.getJobName(); + action.setJobName(jobName); + } + + //incarnate resources + ResourceSet incarnated=grounder.incarnateResources(action,client); + action.getExecutionContext().setResourceRequest(incarnated); + updateExecutionContext(pa, action); + + //update the job description stored in the action + ApplicationDocument ad=ApplicationDocument.Factory.newInstance(); + ApplicationType at=ad.addNewApplication(); + POSIXApplicationDocument pad=new JSDLRenderer().render(pa); + XmlBeansUtils.append(pad,ad); + jd.getJobDefinition().getJobDescription().setApplication(at); + action.setAjd((Serializable)jd); + action.setDirty(); + + } catch (Exception e) { + logger.warn("Failure extracting JSDL info.",e); + //wrap and rethrow + throw new ExecutionException(e); + } + } + /** + * updates ExecutionContext with values from the incarnated application + * @param ec ExecutionContext to update + * @param appDescription + */ + public void updateExecutionContext(ApplicationInfo appDescription, Action job){ + ExecutionContext ec=job.getExecutionContext(); + + //store executable + String executable=appDescription.getExecutable(); + job.getExecutionContext().setExecutable(executable); + + //stdin, etc + if(appDescription.getStdout()!=null)ec.setStdout(appDescription.getStdout()); + if(appDescription.getStderr()!=null)ec.setStderr(appDescription.getStderr()); + if(appDescription.getStdin()!=null)ec.setStdin(appDescription.getStdin()); + //environment + HashMap<String,String>map=ec.getEnvironment(); + map.putAll(appDescription.getEnvironment()); + + //interactive execution + if(Boolean.parseBoolean(appDescription.getEnvironment().get("UC_PREFER_INTERACTIVE_EXECUTION"))){ + ec.setInteractive(true); + } + + String email=null; + //get user email from job annotation + try{ + JobDefinitionDocument jsdl=((JobDefinitionDocument)job.getAjd()); + email=extractEmail(jsdl); + }catch(Exception e){} + Client c=job.getClient(); + if(c!=null && email!=null)c.setUserEmail(email); + job.setDirty(); + } + + + + /** + * extract the email address from the job's JobAnnotation elements + * @param jsdl + * @return + */ + public static String extractEmail(JobDefinitionDocument jsdl){ + String email=null; + if(jsdl.getJobDefinition().getJobDescription().getJobIdentification()!=null){ + String[] annotations=jsdl.getJobDefinition().getJobDescription().getJobIdentification().getJobAnnotationArray(); + if(annotations!=null){ + for(String annotation: annotations){ + email=BasicExecution.getEmailAddress(annotation); + if(email!=null)break; + } + } + } + return email; + } + + private void storeTimeStamp(String key){ action.getProcessingContext().put(key, Long.valueOf(System.currentTimeMillis())); } + + private void storeStatistics(){ + Float timeQueued=getTimeQueued(action.getProcessingContext()); + if(timeQueued!=null){ + EMSStatistics stats=configuration.getComponentInstanceOfType(EMSStatistics.class); + stats.updateMeanTimeQueued(timeQueued.longValue()); + } + } + private static Float getTimeQueued(Map<Object,Object>context){ + Long submitToBSS=(Long)context.get(SUBMITTED); + Long startBSSExecution=(Long)context.get(START_RUNNING); + if(startBSSExecution!=null){ + return new Float((startBSSExecution-submitToBSS)); + } + else return null; + } + public static String getTimeProfile(Map<Object,Object>context){ try{ Long start=(Long)context.get(STARTTIME); Long endStageIn=(Long)context.get(ENDSTAGEIN); Long startStageOut=(Long)context.get(STARTSTAGEOUT); Long end=(Long)context.get(ENDTIME); + StringBuilder sb=new StringBuilder(); - + Float totalTime=new Float((end-start)/1000f); sb.append("Total: "+String.format("%.2f sec.", totalTime)); sb.append(", "); @@ -371,19 +705,26 @@ sb.append(", "); Float totalStageOut=new Float((end-startStageOut)/1000f); sb.append("Stage-out: "+String.format("%.2f sec.", totalStageOut)); + + Float timeQueued=getTimeQueued(context); + if(timeQueued!=null){ + timeQueued=timeQueued/1000f; + sb.append(", "); + sb.append("Time in queue: "+String.format("%.2f sec.", timeQueued)); + } //datamovement percentage Float dataPercentage=100 * (totalStag... [truncated message content] |