|
From: <ha...@us...> - 2007-08-20 22:39:38
|
Revision: 1696
http://cogkit.svn.sourceforge.net/cogkit/?rev=1696&view=rev
Author: hategan
Date: 2007-08-20 15:39:11 -0700 (Mon, 20 Aug 2007)
Log Message:
-----------
added stdoutLocation, stderrLocation, and failOnJobError
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/GridExec.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/GridExec.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/GridExec.java 2007-08-20 22:34:12 UTC (rev 1695)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/GridExec.java 2007-08-20 22:39:11 UTC (rev 1696)
@@ -15,9 +15,12 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.StatusEvent;
+import org.globus.cog.abstraction.impl.common.execution.JobException;
import org.globus.cog.abstraction.impl.common.task.GenericTaskHandler;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.abstraction.interfaces.FileLocation;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Service;
import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
@@ -43,6 +46,8 @@
public static final Arg A_HOST = new Arg.Optional("host");
public static final Arg A_STDOUT = new Arg.Optional("stdout");
public static final Arg A_STDERR = new Arg.Optional("stderr");
+ public static final Arg A_STDOUTLOCATION = new Arg.Optional("stdoutLocation");
+ public static final Arg A_STDERRLOCATION = new Arg.Optional("stderrLocation");
public static final Arg A_STDIN = new Arg.Optional("stdin");
public static final Arg A_PROVIDER = new Arg.Optional("provider");
public static final Arg A_SECURITY_CONTEXT = new Arg.Optional("securitycontext");
@@ -62,13 +67,14 @@
public static final Arg A_DELEGATION = new Arg.Optional("delegation", Boolean.FALSE);
public static final Arg.Channel C_ENVIRONMENT = new Arg.Channel("environment");
public static final Arg A_ATTRIBUTES = new Arg.Optional("attributes", Collections.EMPTY_MAP);
+ public static final Arg A_FAIL_ON_JOB_ERROR = new Arg.Optional("failonjoberror", Boolean.TRUE);
static {
setArguments(GridExec.class, new Arg[] { A_EXECUTABLE, A_ARGS, A_ARGUMENTS, A_HOST,
- A_STDOUT, A_STDERR, A_STDIN, A_PROVIDER, A_COUNT, A_JOBTYPE, A_MAXTIME,
- A_MAXWALLTIME, A_MAXCPUTIME, A_ENVIRONMENT, A_QUEUE, A_PROJECT, A_MINMEMORY,
- A_MAXMEMORY, A_REDIRECT, A_SECURITY_CONTEXT, A_DIRECTORY, A_NATIVESPEC,
- A_DELEGATION, A_ATTRIBUTES, C_ENVIRONMENT });
+ A_STDOUT, A_STDERR, A_STDOUTLOCATION, A_STDERRLOCATION, A_STDIN, A_PROVIDER,
+ A_COUNT, A_JOBTYPE, A_MAXTIME, A_MAXWALLTIME, A_MAXCPUTIME, A_ENVIRONMENT, A_QUEUE,
+ A_PROJECT, A_MINMEMORY, A_MAXMEMORY, A_REDIRECT, A_SECURITY_CONTEXT, A_DIRECTORY,
+ A_NATIVESPEC, A_DELEGATION, A_ATTRIBUTES, C_ENVIRONMENT, A_FAIL_ON_JOB_ERROR });
}
public void submitTask(VariableStack stack) throws ExecutionException {
@@ -79,9 +85,10 @@
Iterator i = named.getNames();
Object host = null;
String provider = null;
+ boolean redirect = false;
while (i.hasNext()) {
String name = (String) i.next();
-
+
Object value = named.getArgument(name);
if (name.equals(A_EXECUTABLE.getName())) {
js.setExecutable(TypeUtil.toString(value));
@@ -98,7 +105,7 @@
}
}
else if (name.equals(A_REDIRECT.getName())) {
- js.setRedirected(TypeUtil.toBoolean(value));
+ redirect = TypeUtil.toBoolean(value);
}
else if (name.equals(A_STDIN.getName())) {
js.setStdInput(TypeUtil.toString(value));
@@ -106,9 +113,15 @@
else if (name.equals(A_STDOUT.getName())) {
js.setStdOutput(TypeUtil.toString(value));
}
+ else if (name.equals(A_STDOUTLOCATION.getName())) {
+ js.setStdOutputLocation(new FileLocation.Impl(TypeUtil.toInt(value)));
+ }
else if (name.equals(A_STDERR.getName())) {
js.setStdError(TypeUtil.toString(value));
}
+ else if (name.equals(A_STDERRLOCATION.getName())) {
+ js.setStdErrorLocation(new FileLocation.Impl(TypeUtil.toInt(value)));
+ }
else if (name.equals(A_DIRECTORY.getName())) {
js.setDirectory(TypeUtil.toString(value));
}
@@ -165,7 +178,8 @@
js.setAttribute((String) e.getKey(), e.getValue());
}
catch (ClassCastException ex) {
- throw new ExecutionException("Invalid attribute name (" + e.getKey() + ")", ex);
+ throw new ExecutionException("Invalid attribute name (" + e.getKey()
+ + ")", ex);
}
}
}
@@ -173,7 +187,12 @@
js.setAttribute(name, value);
}
}
-
+
+ if (redirect) {
+ js.setStdOutputLocation(FileLocation.MEMORY);
+ js.setStdErrorLocation(FileLocation.MEMORY);
+ }
+
VariableArguments env = C_ENVIRONMENT.get(stack);
Iterator j = env.iterator();
while (j.hasNext()) {
@@ -253,42 +272,50 @@
throw new ExecutionException("Exception caught while submitting job", e);
}
}
-
+
private List stringify(List l) {
- ArrayList sl = new ArrayList(l.size());
- Iterator i = l.iterator();
- while (i.hasNext()) {
- sl.add(TypeUtil.toString(i.next()));
- }
- return sl;
+ ArrayList sl = new ArrayList(l.size());
+ Iterator i = l.iterator();
+ while (i.hasNext()) {
+ sl.add(TypeUtil.toString(i.next()));
+ }
+ return sl;
}
protected void taskFailed(StatusEvent e, VariableStack stack) throws ExecutionException {
Task task = (Task) e.getSource();
- if (task.getStdError() != null) {
- failImmediately(stack, task.getStdError());
+ returnOutputs(task, stack);
+ Exception ex = e.getStatus().getException();
+ if (ex instanceof JobException && !TypeUtil.toBoolean(A_FAIL_ON_JOB_ERROR.getValue(stack))) {
+ Arg.VARGS.ret(stack, new Integer(((JobException) ex).getExitCode()));
+ super.taskCompleted(e, stack);
}
- else if (e.getStatus().getException() != null) {
- failImmediately(stack, e.getStatus().getException());
- }
- else if (e.getStatus().getMessage() != null) {
- failImmediately(stack, e.getStatus().getMessage());
- }
else {
- failImmediately(stack, "Task failed");
+ if (e.getStatus().getException() != null) {
+ failImmediately(stack, e.getStatus().getException());
+ }
+ else if (e.getStatus().getMessage() != null) {
+ failImmediately(stack, e.getStatus().getMessage());
+ }
+ else {
+ failImmediately(stack, "Task failed");
+ }
}
}
protected void taskCompleted(StatusEvent e, VariableStack stack) throws ExecutionException {
- if (TypeUtil.toBoolean(A_REDIRECT.getValue(stack))) {
- Task t = (Task) e.getSource();
- if (t.getStdOutput() != null) {
- STDOUT.ret(stack, t.getStdOutput());
- }
- if (t.getStdError() != null) {
- STDERR.ret(stack, t.getStdError());
- }
- }
+ Task t = (Task) e.getSource();
+ returnOutputs(t, stack);
super.taskCompleted(e, stack);
}
+
+ protected void returnOutputs(Task t, VariableStack stack) throws ExecutionException {
+ JobSpecification spec = (JobSpecification) t.getSpecification();
+ if (t.getStdOutput() != null && FileLocation.MEMORY.overlaps(spec.getStdOutputLocation())) {
+ STDOUT.ret(stack, t.getStdOutput());
+ }
+ if (t.getStdError() != null && FileLocation.MEMORY.overlaps(spec.getStdErrorLocation())) {
+ STDERR.ret(stack, t.getStdError());
+ }
+ }
}
\ No newline at end of file
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|