|
From: <ha...@us...> - 2007-03-11 19:35:57
|
Revision: 1601
http://svn.sourceforge.net/cogkit/?rev=1601&view=rev
Author: hategan
Date: 2007-03-11 12:35:55 -0700 (Sun, 11 Mar 2007)
Log Message:
-----------
more efficient implementation
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java 2007-03-08 19:48:41 UTC (rev 1600)
+++ trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java 2007-03-11 19:35:55 UTC (rev 1601)
@@ -6,11 +6,12 @@
package org.globus.cog.abstraction.impl.execution.local;
-import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileWriter;
-import java.io.InputStreamReader;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
@@ -31,231 +32,233 @@
* @author Kaizar Amin (am...@mc...)
*
*/
-public class JobSubmissionTaskHandler implements DelegatedTaskHandler, Runnable {
- private static Logger logger = Logger.getLogger(JobSubmissionTaskHandler.class);
+public class JobSubmissionTaskHandler implements DelegatedTaskHandler,
+ Runnable {
+ private static Logger logger = Logger
+ .getLogger(JobSubmissionTaskHandler.class);
- private Task task = null;
- private Thread thread = null;
- private Process process;
- private volatile boolean killed;
+ public static final int BUFFER_SIZE = 1024;
- public void submit(Task task) throws IllegalSpecException, InvalidSecurityContextException,
- InvalidServiceContactException, TaskSubmissionException {
- if (this.task != null) {
- throw new TaskSubmissionException(
- "JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
- }
- else {
- this.task = task;
- JobSpecification spec;
- try {
- spec = (JobSpecification) this.task.getSpecification();
- }
- catch (Exception e) {
- throw new IllegalSpecException("Exception while retreiving Job Specification", e);
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug(spec.toString());
- }
+ private Task task = null;
+ private Thread thread = null;
+ private Process process;
+ private volatile boolean killed;
- try {
- this.thread = new Thread(this);
- // check if the task has not been canceled after it was
- // submitted for execution
- if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
- this.task.setStatus(Status.SUBMITTED);
- this.thread.start();
- if (spec.isBatchJob()) {
- this.task.setStatus(Status.COMPLETED);
- }
- }
- }
- catch (Exception e) {
- Status newStatus = new StatusImpl();
- Status oldStatus = this.task.getStatus();
- newStatus.setPrevStatusCode(oldStatus.getStatusCode());
- newStatus.setStatusCode(Status.FAILED);
- newStatus.setException(e);
- this.task.setStatus(newStatus);
- throw new TaskSubmissionException("Cannot submit job", e);
- }
- }
- }
+ public void submit(Task task) throws IllegalSpecException,
+ InvalidSecurityContextException, InvalidServiceContactException,
+ TaskSubmissionException {
+ if (this.task != null) {
+ throw new TaskSubmissionException(
+ "JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
+ } else {
+ this.task = task;
+ JobSpecification spec;
+ try {
+ spec = (JobSpecification) this.task.getSpecification();
+ } catch (Exception e) {
+ throw new IllegalSpecException(
+ "Exception while retreiving Job Specification", e);
+ }
- public void suspend() throws InvalidSecurityContextException, TaskSubmissionException {
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug(spec.toString());
+ }
- public void resume() throws InvalidSecurityContextException, TaskSubmissionException {
- }
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("Submitting task " + task);
+ }
+ this.thread = new Thread(this);
+ // check if the task has not been canceled after it was
+ // submitted for execution
+ if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
+ this.task.setStatus(Status.SUBMITTED);
+ this.thread.start();
+ if (spec.isBatchJob()) {
+ this.task.setStatus(Status.COMPLETED);
+ }
+ }
+ } catch (Exception e) {
+ Status newStatus = new StatusImpl();
+ Status oldStatus = this.task.getStatus();
+ newStatus.setPrevStatusCode(oldStatus.getStatusCode());
+ newStatus.setStatusCode(Status.FAILED);
+ newStatus.setException(e);
+ this.task.setStatus(newStatus);
+ throw new TaskSubmissionException("Cannot submit job", e);
+ }
+ }
+ }
- public void cancel() throws InvalidSecurityContextException, TaskSubmissionException {
- killed = true;
- process.destroy();
- this.task.setStatus(Status.CANCELED);
- }
+ public void suspend() throws InvalidSecurityContextException,
+ TaskSubmissionException {
+ }
- public void run() {
- try {
- // TODO move away from the multi-threaded approach
- JobSpecification spec = (JobSpecification) this.task.getSpecification();
+ public void resume() throws InvalidSecurityContextException,
+ TaskSubmissionException {
+ }
- File dir = null;
- if (spec.getDirectory() != null) {
- dir = new File(spec.getDirectory());
- }
+ public void cancel() throws InvalidSecurityContextException,
+ TaskSubmissionException {
+ killed = true;
+ process.destroy();
+ this.task.setStatus(Status.CANCELED);
+ }
- process = Runtime.getRuntime().exec(buildCmdArray(spec), buildEnvp(spec), dir);
+ public void run() {
+ try {
+ // TODO move away from the multi-threaded approach
+ JobSpecification spec = (JobSpecification) this.task
+ .getSpecification();
- if (spec.getStdInput() != null) {
- OutputStream out = process.getOutputStream();
-
- File stdin;
- if (dir != null) {
- stdin = new File(dir, spec.getStdInput());
- }
- else {
- stdin = new File(spec.getStdInput());
- }
+ File dir = null;
+ if (spec.getDirectory() != null) {
+ dir = new File(spec.getDirectory());
+ }
- FileInputStream file = new FileInputStream(stdin);
- InputStreamReader inReader = new InputStreamReader(file);
- BufferedReader inBuffer = new BufferedReader(inReader);
- String message = inBuffer.readLine();
- while (message != null) {
- out.write(message.getBytes());
- message = inBuffer.readLine();
- }
- inBuffer.close();
- }
+ process = Runtime.getRuntime().exec(buildCmdArray(spec),
+ buildEnvp(spec), dir);
+ this.task.setStatus(Status.ACTIVE);
- // process output
- InputStreamReader inReader = new InputStreamReader(process.getInputStream());
- BufferedReader inBuffer = new BufferedReader(inReader);
- String message = inBuffer.readLine();
- String output = message;
- while (message != null) {
- message = inBuffer.readLine();
- if (message != null) {
- output += message + "\n";
- }
- }
- if (spec.getStdOutput() == null) {
- // redirect output to the stdOutput of task
- this.task.setStdOutput(output);
- logger.debug("STDOUT from job: " + output);
- }
- else {
- // redirect it to the specified file
- File stdout;
- if (dir != null) {
- stdout = new File(dir, spec.getStdOutput());
- }
- else {
- stdout = new File(spec.getStdOutput());
- }
- FileWriter writer = new FileWriter(stdout);
- if (output != null) {
- writer.write(output);
- writer.flush();
- }
- writer.close();
- }
+ // reusable byte buffer
+ byte[] buf = new byte[BUFFER_SIZE];
+ /*
+ * This should be interleaved with stdout processing, since a
+ * process could block if its STDOUT is not consumed, thus causing a
+ * deadlock
+ */
+ processIN(spec.getStdInput(), dir, buf);
- // process error
- inReader = new InputStreamReader(process.getErrorStream());
- inBuffer = new BufferedReader(inReader);
- message = inBuffer.readLine();
- output = message;
- while (message != null) {
- message = inBuffer.readLine();
- if (message != null) {
- output += message + "\n";
- }
- }
- if (spec.getStdError() == null) {
- // redirect output to the stdError of task
- this.task.setStdError(output);
- logger.debug("STDERR from job: " + output);
- }
- else {
- // redirect it to the specified file
- File stderr;
- if (dir != null) {
- stderr = new File(dir, spec.getStdError());
- }
- else {
- stderr = new File(spec.getStdError());
- }
- FileWriter writer = new FileWriter(stderr);
- if (output != null) {
- writer.write(output);
- writer.flush();
- }
- writer.close();
- }
+ if (spec.getStdOutput() != null || spec.isRedirected()) {
+ String out = processOUT(spec.getStdOutput(), dir, buf,
+ process.getInputStream());
+ if (out != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("STDOUT from job: " + out);
+ }
+ this.task.setStdOutput(out);
+ }
+ }
- if (spec.isBatchJob()) {
- return;
- }
+ if (spec.getStdError() != null || spec.isRedirected()) {
+ String err = processOUT(spec.getStdError(), dir, buf, process
+ .getErrorStream());
+ if (err != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("STDERR from job: " + err);
+ }
+ this.task.setStdError(err);
+ }
+ }
- int exitCode = process.waitFor();
- logger.debug("Exit code was " + exitCode);
- if (killed) {
- return;
- }
- if (exitCode == 0) {
- this.task.setStatus(Status.COMPLETED);
- }
- else {
- throw new Exception("Job failed with an exit code of " + exitCode + "\n" + output);
- }
- }
- catch (Exception e) {
- if (killed) {
- return;
- }
- logger.debug("Exception while running local executable", e);
- Status newStatus = new StatusImpl();
- Status oldStatus = this.task.getStatus();
- newStatus.setPrevStatusCode(oldStatus.getStatusCode());
- newStatus.setStatusCode(Status.FAILED);
- newStatus.setException(e);
- this.task.setStatus(newStatus);
- }
- }
+ if (spec.isBatchJob()) {
+ return;
+ }
- private String[] buildCmdArray(JobSpecification spec) {
- List arguments = spec.getArgumentsAsList();
- String[] cmdarray = new String[arguments.size() + 1];
+ int exitCode = process.waitFor();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exit code was " + exitCode);
+ }
+ if (killed) {
+ return;
+ }
+ if (exitCode == 0) {
+ this.task.setStatus(Status.COMPLETED);
+ } else {
+ throw new Exception("Job failed with an exit code of "
+ + exitCode);
+ }
+ } catch (Exception e) {
+ if (killed) {
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exception while running local executable", e);
+ }
+ Status newStatus = new StatusImpl();
+ Status oldStatus = this.task.getStatus();
+ newStatus.setPrevStatusCode(oldStatus.getStatusCode());
+ newStatus.setStatusCode(Status.FAILED);
+ newStatus.setException(e);
+ this.task.setStatus(newStatus);
+ }
+ }
- cmdarray[0] = spec.getExecutable();
- Iterator i = arguments.iterator();
- int index = 1;
- while (i.hasNext()) {
- cmdarray[index++] = (String) i.next();
- }
- return cmdarray;
- }
+ protected void processIN(String in, File dir, byte[] buf)
+ throws IOException {
+ if (in != null) {
+ OutputStream out = process.getOutputStream();
- private String[] buildEnvp(JobSpecification spec) {
- Collection names = spec.getEnvironmentVariableNames();
- if (names.size() == 0) {
- /*
- * Questionable. An envp of null will cause the parent environment
- * to be inherited, while an empty one will cause no environment
- * variables to be set for the process. Or so it seems from the
- * Runtime.exec docs.
- */
- return null;
- }
- String[] envp = new String[names.size()];
- Iterator i = names.iterator();
- int index = 0;
- while (i.hasNext()) {
- String name = (String) i.next();
- envp[index++] = name + "=" + spec.getEnvironmentVariable(name);
- }
- return envp;
- }
-}
\ No newline at end of file
+ File stdin;
+ if (dir != null) {
+ stdin = new File(dir, in);
+ } else {
+ stdin = new File(in);
+ }
+
+ FileInputStream file = new FileInputStream(stdin);
+ int read = file.read(buf);
+ while (read != -1) {
+ out.write(buf, 0, read);
+ read = file.read(buf);
+ }
+ file.close();
+ }
+ }
+
+ protected String processOUT(String out, File dir, byte[] buf,
+ InputStream pin) throws IOException {
+
+ OutputStream os;
+ if (out == null) {
+ os = new ByteArrayOutputStream();
+ } else {
+ os = new FileOutputStream(out);
+ }
+ int len = pin.read(buf);
+ while (len != -1) {
+ os.write(buf, 0, len);
+ len = pin.read(buf);
+ }
+ os.close();
+ if (out == null) {
+ return os.toString();
+ } else {
+ return null;
+ }
+ }
+
+ private String[] buildCmdArray(JobSpecification spec) {
+ List arguments = spec.getArgumentsAsList();
+ String[] cmdarray = new String[arguments.size() + 1];
+
+ cmdarray[0] = spec.getExecutable();
+ Iterator i = arguments.iterator();
+ int index = 1;
+ while (i.hasNext()) {
+ cmdarray[index++] = (String) i.next();
+ }
+ return cmdarray;
+ }
+
+ private String[] buildEnvp(JobSpecification spec) {
+ Collection names = spec.getEnvironmentVariableNames();
+ if (names.size() == 0) {
+ /*
+ * Questionable. An envp of null will cause the parent environment
+ * to be inherited, while an empty one will cause no environment
+ * variables to be set for the process. Or so it seems from the
+ * Runtime.exec docs.
+ */
+ return null;
+ }
+ String[] envp = new String[names.size()];
+ Iterator i = names.iterator();
+ int index = 0;
+ while (i.hasNext()) {
+ String name = (String) i.next();
+ envp[index++] = name + "=" + spec.getEnvironmentVariable(name);
+ }
+ return envp;
+ }
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-09-19 20:43:49
|
Revision: 1751
http://cogkit.svn.sourceforge.net/cogkit/?rev=1751&view=rev
Author: hategan
Date: 2007-09-19 13:43:48 -0700 (Wed, 19 Sep 2007)
Log Message:
-----------
don't set status if exception is thrown
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java 2007-09-19 20:43:23 UTC (rev 1750)
+++ trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java 2007-09-19 20:43:48 UTC (rev 1751)
@@ -83,12 +83,6 @@
}
}
} catch (Exception e) {
- Status newStatus = new StatusImpl();
- Status oldStatus = this.task.getStatus();
- newStatus.setPrevStatusCode(oldStatus.getStatusCode());
- newStatus.setStatusCode(Status.FAILED);
- newStatus.setException(e);
- this.task.setStatus(newStatus);
throw new TaskSubmissionException("Cannot submit job", e);
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-02-12 17:29:15
|
Revision: 1892
http://cogkit.svn.sourceforge.net/cogkit/?rev=1892&view=rev
Author: hategan
Date: 2008-02-12 09:29:05 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
fixed cancelling and weird status checks: if you want to see if it was canceled then don't check if it's unsubmitted; those are two different things
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java 2008-02-12 17:27:23 UTC (rev 1891)
+++ trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java 2008-02-12 17:29:05 UTC (rev 1892)
@@ -54,8 +54,13 @@
if (this.task != null) {
throw new TaskSubmissionException(
"JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
- } else {
+ }
+ else if (this.task.getStatus().getStatusCode() != Status.UNSUBMITTED) {
+ throw new TaskSubmissionException("Task is not in unsubmitted state");
+ }
+ else {
this.task = task;
+ task.setStatus(Status.SUBMITTING);
JobSpecification spec;
try {
spec = (JobSpecification) this.task.getSpecification();
@@ -72,14 +77,14 @@
if (logger.isInfoEnabled()) {
logger.info("Submitting task " + task);
}
- this.thread = new Thread(this);
- // check if the task has not been canceled after it was
- // submitted for execution
- if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
- this.task.setStatus(Status.SUBMITTED);
- this.thread.start();
- if (spec.isBatchJob()) {
- this.task.setStatus(Status.COMPLETED);
+ synchronized(this) {
+ this.thread = new Thread(this);
+ if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
+ this.task.setStatus(Status.SUBMITTED);
+ this.thread.start();
+ if (spec.isBatchJob()) {
+ this.task.setStatus(Status.COMPLETED);
+ }
}
}
} catch (Exception e) {
@@ -98,9 +103,11 @@
public void cancel() throws InvalidSecurityContextException,
TaskSubmissionException {
- killed = true;
- process.destroy();
- this.task.setStatus(Status.CANCELED);
+ synchronized(this) {
+ killed = true;
+ process.destroy();
+ this.task.setStatus(Status.CANCELED);
+ }
}
private static final FileLocation REDIRECT_LOCATION = FileLocation.MEMORY
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-02-12 20:20:11
|
Revision: 1901
http://cogkit.svn.sourceforge.net/cogkit/?rev=1901&view=rev
Author: hategan
Date: 2008-02-12 12:20:02 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
fixed npe
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java 2008-02-12 17:58:19 UTC (rev 1900)
+++ trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java 2008-02-12 20:20:02 UTC (rev 1901)
@@ -55,7 +55,7 @@
throw new TaskSubmissionException(
"JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
}
- else if (this.task.getStatus().getStatusCode() != Status.UNSUBMITTED) {
+ else if (task.getStatus().getStatusCode() != Status.UNSUBMITTED) {
throw new TaskSubmissionException("Task is not in unsubmitted state");
}
else {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|