|
From: <ha...@us...> - 2007-02-13 01:09:18
|
Revision: 1579
http://svn.sourceforge.net/cogkit/?rev=1579&view=rev
Author: hategan
Date: 2007-02-12 17:09:16 -0800 (Mon, 12 Feb 2007)
Log Message:
-----------
changed the way polling is done with PBS
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java 2007-02-13 00:49:05 UTC (rev 1578)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java 2007-02-13 01:09:16 UTC (rev 1579)
@@ -27,6 +27,7 @@
public static final Logger logger = Logger.getLogger(QueuePoller.class);
private LinkedList newjobs, donejobs;
+ private Set processed;
private Map jobs;
boolean any = false;
private int sleepTime;
@@ -38,6 +39,7 @@
newjobs = new LinkedList();
donejobs = new LinkedList();
sleepTime = Properties.getProperties().getPollInterval() * 1000;
+ processed = new HashSet();
}
public void addJob(Job job) {
@@ -156,8 +158,6 @@
return null;
}
- private static Set processed = new HashSet();
-
protected void processStdout(InputStream is) throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String line;
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java 2007-02-13 00:49:05 UTC (rev 1578)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java 2007-02-13 01:09:16 UTC (rev 1579)
@@ -14,9 +14,11 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Set;
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.scheduler.common.Job;
@@ -24,8 +26,8 @@
public class QueuePoller extends Thread {
public static final Logger logger = Logger.getLogger(QueuePoller.class);
- private String qstat;
private LinkedList newjobs, donejobs;
+ private Set processed;
private Map jobs;
boolean any = false;
private int sleepTime;
@@ -37,7 +39,7 @@
newjobs = new LinkedList();
donejobs = new LinkedList();
sleepTime = Properties.getProperties().getPollInterval() * 1000;
- qstat = Properties.getProperties().getQStat();
+ processed = new HashSet();
}
public void addJob(Job job) {
@@ -122,7 +124,8 @@
jobs.clear();
}
- private static String[] cmdarray;
+ private static final String[] QSTAT = new String[] {
+ Properties.getProperties().getQStat(), "-f" };
protected void pollQueue() {
try {
@@ -132,18 +135,7 @@
if (jobs.size() == 0) {
return;
}
- if (cmdarray == null || cmdarray.length != jobs.size() + 2) {
- cmdarray = new String[jobs.size() + 2];
- cmdarray[0] = qstat;
- cmdarray[1] = "-f";
- }
- Iterator i = jobs.keySet().iterator();
- int j = 2;
- while (i.hasNext()) {
- cmdarray[j] = (String) i.next();
- j++;
- }
- Process pqstat = Runtime.getRuntime().exec(cmdarray);
+ Process pqstat = Runtime.getRuntime().exec(QSTAT);
int ec = pqstat.waitFor();
if (ec != 0) {
failAll("QStat failed (exit code " + ec + ")");
@@ -158,6 +150,7 @@
protected void processStdout(InputStream is) throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ processed.clear();
String line;
String currentJobID = null;
Job currentJob = null;
@@ -167,6 +160,7 @@
line = line.trim();
if (line.startsWith("Job Id: ")) {
currentJobID = line.substring("Job Id: ".length());
+ processed.add(currentJobID);
currentJob = (Job) jobs.get(currentJobID);
continue;
}
@@ -195,37 +189,23 @@
}
}
} while (line != null);
+ Iterator i = jobs.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ String id = (String) e.getKey();
+ if (!processed.contains(id)) {
+ Job job = (Job) e.getValue();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + id + " is Done");
+ }
+ job.setState(Job.STATE_DONE);
+ if (job.getState() == Job.STATE_DONE) {
+ donejobs.add(id);
+ }
+ }
+ }
}
protected void processStderr(InputStream is) throws IOException {
- BufferedReader br = new BufferedReader(new InputStreamReader(is));
- String line;
- do {
- line = br.readLine();
- if (line != null) {
- line = line.trim();
- if (line.startsWith("qstat: Unknown Job Id ")) {
- String jobid = line.substring("qstat: Unknown Job Id "
- .length());
- Job job = (Job) jobs.get(jobid);
- if (job != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("Status for " + jobid + " is Done");
- }
-
- /*
- * The state setting is vetoable since the exit code file must
- * be present on the disk before the job can be marked as done.
- * This is done in order to be safe with the NFS case, but it also
- * needs to be done so that it works with my PBS emulator ;)
- */
- job.setState(Job.STATE_DONE);
- if (job.getState() == Job.STATE_DONE) {
- donejobs.add(jobid);
- }
- }
- }
- }
- } while (line != null);
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-02-15 03:48:44
|
Revision: 1587
http://svn.sourceforge.net/cogkit/?rev=1587&view=rev
Author: hategan
Date: 2007-02-14 19:48:41 -0800 (Wed, 14 Feb 2007)
Log Message:
-----------
made cobalt actually work
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltExecutor.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java
Added Paths:
-----------
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltJob.java
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltExecutor.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltExecutor.java 2007-02-14 20:15:51 UTC (rev 1586)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltExecutor.java 2007-02-15 03:48:41 UTC (rev 1587)
@@ -10,20 +10,16 @@
package org.globus.cog.abstraction.impl.scheduler.cobalt;
import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
-import org.globus.cog.abstraction.impl.scheduler.common.Job;
import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
import org.globus.cog.abstraction.interfaces.JobSpecification;
@@ -38,8 +34,7 @@
private Task task;
private static QueuePoller poller;
private ProcessListener listener;
- private String stdout, stderr, exitcode;
- private File script;
+ private String stdout, stderr;
private String cqsub;
private static final String[] EMPTY_STRING_ARRAY = new String[0];
@@ -69,19 +64,13 @@
throw new IOException("Failed to create script directory ("
+ scriptdir + ")");
}
- script = File.createTempFile("cobalt", ".sh", scriptdir);
- stdout = spec.getStdOutput() == null ? script.getAbsolutePath()
- + ".stdout" : spec.getStdOutput();
- stderr = spec.getStdError() == null ? script.getAbsolutePath()
- + ".stderr" : spec.getStdError();
- exitcode = script.getAbsolutePath() + ".exitcode";
- writeScript(new BufferedWriter(new FileWriter(script)), exitcode,
- stdout, stderr);
- if (logger.isDebugEnabled()) {
- logger.debug("Wrote submit script to " + script);
- }
- String[] cmdline = buildCMDLine(script.getAbsolutePath());
+ stdout = File.createTempFile("cobalt", ".stdout", scriptdir)
+ .getAbsolutePath();
+ stderr = stdout.substring(0, stdout.length() - ".stdout".length())
+ + ".stderr";
+ String[] cmdline = buildCMDLine(stdout, stderr);
+
Process process = Runtime.getRuntime().exec(cmdline, null, null);
try {
@@ -93,13 +82,15 @@
try {
int code = process.waitFor();
if (code != 0) {
+ // grr. cqsub outputs error messages on stdout
throw new ProcessException(
- "Could not submit job (qsub reported an exit code of "
+ "Could not submit job (cqsub reported an exit code of "
+ code + "). "
- + getOutput(process.getErrorStream()));
+ + getOutput(process.getErrorStream())
+ + getOutput(process.getInputStream()));
}
if (logger.isDebugEnabled()) {
- logger.debug("QSub done (exit code " + code + ")");
+ logger.debug("cqsub done (exit code " + code + ")");
}
}
catch (InterruptedException e) {
@@ -113,57 +104,38 @@
}
String jobid = getOutput(process.getInputStream());
+ if (jobid == null || jobid.equals("")) {
+ throw new IOException("cqsub returned empty job ID");
+ }
getProcessPoller().addJob(
- new Job(jobid, spec.isRedirected() ? stdout : null, spec
- .isRedirected() ? stderr : null, exitcode, this));
+ new CobaltJob(jobid, spec.isRedirected(), stdout, stderr, spec
+ .getStdOutput(), spec.getStdError(), this));
}
private void error(String message) {
listener.processFailed(message);
}
- protected void writeScript(Writer wr, String exitcodefile, String stdout,
- String stderr) throws IOException {
- if (spec.getStdInput() != null) {
- throw new IOException("The Cobalt provider cannot redirect STDIN");
- }
- Iterator i = spec.getEnvironmentVariableNames().iterator();
- while (i.hasNext()) {
- String name = (String) i.next();
- wr.write(name);
- wr.write('=');
- wr.write(quote(spec.getEnvironmentVariable(name)));
- wr.write('\n');
- }
- wr.write(quote(spec.getExecutable()));
- List args = spec.getArgumentsAsList();
- if (args != null && args.size() > 0) {
- wr.write(' ');
- i = args.iterator();
- while (i.hasNext()) {
- wr.write(quote((String) i.next()));
- wr.write(' ');
- }
- }
- wr.write(" 1>" + quote(stdout) + ' ');
- wr.write(" 2>" + quote(stderr) + '\n');
- wr.write("/bin/echo $? >" + exitcodefile + '\n');
- wr.close();
- }
-
protected void addAttr(String attrName, String option, List l) {
+ addAttr(attrName, option, l, null);
+ }
+
+ protected void addAttr(String attrName, String option, List l, String defval) {
Object value = spec.getAttribute(attrName);
if (value != null) {
l.add(option);
l.add(String.valueOf(value));
}
+ else if (defval != null) {
+ l.add(option);
+ l.add(defval);
+ }
}
- protected String[] buildCMDLine(String script) {
+ protected String[] buildCMDLine(String stdout, String stderr) {
List l = new ArrayList();
l.add(cqsub);
- addAttr("queue", "-q", l);
Collection names = spec.getEnvironmentVariableNames();
if (names != null && names.size() > 0) {
l.add("-e");
@@ -181,17 +153,25 @@
l.add(sb.toString());
}
addAttr("mode", "-m", l);
- //We're gonna treat this as the process count
- addAttr("count", "-c", l);
+ // We're gonna treat this as the node count
+ addAttr("count", "-n", l, "1");
addAttr("project", "-p", l);
addAttr("queue", "-q", l);
- addAttr("maxwalltime", "-t", l);
+ // cqsub seems to require both the node count and time args
+ addAttr("maxwalltime", "-t", l, "10");
if (spec.getDirectory() != null) {
l.add("-C");
l.add(spec.getDirectory());
}
- l.add("/bin/sh");
- l.add(script);
+ l.add("-o");
+ l.add(stdout);
+ l.add("-E");
+ l.add(stderr);
+ l.add(spec.getExecutable());
+ List args = spec.getArgumentsAsList();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cqsub cmd line: " + l);
+ }
return (String[]) l.toArray(EMPTY_STRING_ARRAY);
}
@@ -228,21 +208,15 @@
if (logger.isDebugEnabled()) {
logger.debug("Output from qsub is: \"" + out + "\"");
}
- if ("".equals(out)) {
- throw new IOException("Qsub returned empty job ID");
+ if (out == null) {
+ out = "";
}
return out;
}
protected void cleanup() {
- script.delete();
- new File(exitcode).delete();
- if (spec.getStdOutput() == null && stdout != null) {
- new File(stdout).delete();
- }
- if (spec.getStdError() == null && stderr != null) {
- new File(stderr).delete();
- }
+ new File(stdout).delete();
+ new File(stderr).delete();
}
public void processCompleted(int exitCode) {
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltJob.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltJob.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltJob.java 2007-02-15 03:48:41 UTC (rev 1587)
@@ -0,0 +1,172 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.cobalt;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.CharArrayWriter;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+
+public class CobaltJob extends Job {
+ public static final Logger logger = Logger.getLogger(CobaltJob.class);
+
+ private String stdout, stderr, tstdout, tstderr;
+ private boolean redirect;
+ private int exitcode;
+
+ public CobaltJob(String jobID, boolean redirect, String stdout,
+ String stderr, String tstdout, String tstderr,
+ ProcessListener listener) {
+ super(jobID, null, null, null, listener);
+ this.redirect = redirect;
+ this.stdout = stdout;
+ this.stderr = stderr;
+ this.tstdout = tstdout;
+ this.tstderr = tstderr;
+ int exitcode = Integer.MIN_VALUE;
+ }
+
+ public boolean close() {
+ if (processStderr() && processStdout()) {
+ if (exitcode == Integer.MIN_VALUE) {
+ listener.processFailed("Did not find the exitcode in the logs");
+ }
+ else {
+ listener.processCompleted(exitcode);
+ }
+ }
+ return true;
+ }
+
+ protected boolean processStdout() {
+ try {
+ CharArrayWriter caw = null;
+ if (redirect) {
+ caw = new CharArrayWriter();
+ }
+ BufferedWriter bw = null;
+ if (tstdout != null) {
+ bw = new BufferedWriter(new FileWriter(tstdout));
+ }
+ BufferedReader br = new BufferedReader(new FileReader(stdout));
+ String line;
+ do {
+ line = br.readLine();
+ if (line != null) {
+ if (redirect) {
+ caw.write(line);
+ caw.write('\n');
+ }
+ else if (tstdout != null) {
+ bw.write(line);
+ bw.write('\n');
+ }
+ }
+ } while (line != null);
+ br.close();
+ if (caw != null) {
+ caw.close();
+ listener.stdoutUpdated(caw.toString());
+ }
+ if (bw != null) {
+ bw.close();
+ }
+ return true;
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exception caught while reading STDOUT", e);
+ }
+ listener.processFailed(new ProcessException(
+ "Exception caught while reading STDOUT", e));
+ return false;
+ }
+ }
+
+ protected boolean processStderr() {
+ try {
+ CharArrayWriter caw = null;
+ if (redirect) {
+ caw = new CharArrayWriter();
+ }
+ BufferedWriter bw = null;
+ if (tstdout != null) {
+ bw = new BufferedWriter(new FileWriter(tstderr));
+ }
+ BufferedReader br = new BufferedReader(new FileReader(stderr));
+ String line;
+ boolean started = false;
+
+ do {
+ line = br.readLine();
+ if (line != null) {
+ if (line.startsWith("<") && line.indexOf("(Info)") != -1
+ && line.indexOf("Starting job") != -1) {
+ started = true;
+ }
+ if (started) {
+ if (!line.startsWith("<")
+ || line.indexOf("(Info)") == -1) {
+ if (redirect) {
+ caw.write(line);
+ caw.write('\n');
+ }
+ else if (tstdout != null) {
+ bw.write(line);
+ bw.write('\n');
+ }
+ }
+ else {
+ int index = line.indexOf("BG/L job exit status =");
+ if (index != -1) {
+ String es = line.substring(index +
+ "BG/L job exit status =".length()).trim();
+ if (!es.startsWith("(") && !es.endsWith(")")) {
+ throw new IOException(
+ "Could not parse job exit status. Invalid exit status line: "
+ + line);
+ }
+ else {
+ exitcode = Integer.parseInt(es.substring(1, es.length() - 1));
+ }
+ }
+ }
+ }
+
+ }
+ } while (line != null);
+ br.close();
+ if (caw != null) {
+ caw.close();
+ listener.stderrUpdated(caw.toString());
+ }
+ if (bw != null) {
+ bw.close();
+ }
+
+ return true;
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exception caught while reading STDERR", e);
+ }
+ listener.processFailed(new ProcessException(
+ "Exception caught while reading STDERR", e));
+ return false;
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java 2007-02-14 20:15:51 UTC (rev 1586)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java 2007-02-15 03:48:41 UTC (rev 1587)
@@ -187,7 +187,7 @@
if (job == null) {
continue;
}
- processed.add(job);
+ processed.add(jobid);
if (state.equals("queued")) {
if (logger.isDebugEnabled()) {
logger.debug("Status for " + jobid + " is Q");
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java 2007-02-14 20:15:51 UTC (rev 1586)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java 2007-02-15 03:48:41 UTC (rev 1587)
@@ -28,8 +28,8 @@
private String jobID;
private String exitcodeFileName;
private String stdout, stderr;
- private ProcessListener listener;
- private int state;
+ protected ProcessListener listener;
+ protected int state;
private int ticks;
public Job(String jobID, String stdout, String stderr,
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-09-19 20:44:24
|
Revision: 1752
http://cogkit.svn.sourceforge.net/cogkit/?rev=1752&view=rev
Author: hategan
Date: 2007-09-19 13:44:23 -0700 (Wed, 19 Sep 2007)
Log Message:
-----------
don't set status if exception is thrown
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java 2007-09-19 20:43:48 UTC (rev 1751)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java 2007-09-19 20:44:23 UTC (rev 1752)
@@ -68,12 +68,6 @@
}
}
catch (Exception e) {
- Status newStatus = new StatusImpl();
- Status oldStatus = this.task.getStatus();
- newStatus.setPrevStatusCode(oldStatus.getStatusCode());
- newStatus.setStatusCode(Status.FAILED);
- newStatus.setException(e);
- this.task.setStatus(newStatus);
if (e.getMessage() != null) {
throw new TaskSubmissionException("Cannot submit job: "
+ e.getMessage(), e);
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java 2007-09-19 20:43:48 UTC (rev 1751)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java 2007-09-19 20:44:23 UTC (rev 1752)
@@ -68,12 +68,6 @@
}
}
catch (Exception e) {
- Status newStatus = new StatusImpl();
- Status oldStatus = this.task.getStatus();
- newStatus.setPrevStatusCode(oldStatus.getStatusCode());
- newStatus.setStatusCode(Status.FAILED);
- newStatus.setException(e);
- this.task.setStatus(newStatus);
if (e.getMessage() != null) {
throw new TaskSubmissionException("Cannot submit job: "
+ e.getMessage(), e);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-02-12 17:32:36
|
Revision: 1894
http://cogkit.svn.sourceforge.net/cogkit/?rev=1894&view=rev
Author: hategan
Date: 2008-02-12 09:32:32 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
fixed some weirdness with the handlers
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java 2008-02-12 17:32:00 UTC (rev 1893)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java 2008-02-12 17:32:32 UTC (rev 1894)
@@ -41,6 +41,7 @@
}
else {
this.task = task;
+ task.setStatus(Status.SUBMITTING);
try {
spec = (JobSpecification) this.task.getSpecification();
}
@@ -57,13 +58,15 @@
}
try {
- new CobaltExecutor(task, this).start();
- // check if the task has not been canceled after it was
- // submitted for execution
- if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
- this.task.setStatus(Status.SUBMITTED);
- if (spec.isBatchJob()) {
- this.task.setStatus(Status.COMPLETED);
+ synchronized(this) {
+ // check if the task has not been canceled after it was
+ // submitted for execution
+ if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
+ new CobaltExecutor(task, this).start();
+ this.task.setStatus(Status.SUBMITTED);
+ if (spec.isBatchJob()) {
+ this.task.setStatus(Status.COMPLETED);
+ }
}
}
}
@@ -87,8 +90,9 @@
TaskSubmissionException {
}
- public void cancel() throws InvalidSecurityContextException,
+ public synchronized void cancel() throws InvalidSecurityContextException,
TaskSubmissionException {
+ //TODO qdel?
this.task.setStatus(Status.CANCELED);
}
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java 2008-02-12 17:32:00 UTC (rev 1893)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java 2008-02-12 17:32:32 UTC (rev 1894)
@@ -41,6 +41,7 @@
}
else {
this.task = task;
+ task.setStatus(Status.SUBMITTING);
try {
spec = (JobSpecification) this.task.getSpecification();
}
@@ -57,13 +58,13 @@
}
try {
- new PBSExecutor(task, this).start();
- // check if the task has not been canceled after it was
- // submitted for execution
- if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
- this.task.setStatus(Status.SUBMITTED);
- if (spec.isBatchJob()) {
- this.task.setStatus(Status.COMPLETED);
+ synchronized(this) {
+ if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
+ new PBSExecutor(task, this).start();
+ this.task.setStatus(Status.SUBMITTED);
+ if (spec.isBatchJob()) {
+ this.task.setStatus(Status.COMPLETED);
+ }
}
}
}
@@ -87,8 +88,9 @@
TaskSubmissionException {
}
- public void cancel() throws InvalidSecurityContextException,
+ public synchronized void cancel() throws InvalidSecurityContextException,
TaskSubmissionException {
+ //TODO actually cancel the job
this.task.setStatus(Status.CANCELED);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|