| 
      
      
      From: <ha...@us...> - 2007-03-11 19:35:57
       | 
| Revision: 1601
          http://svn.sourceforge.net/cogkit/?rev=1601&view=rev
Author:   hategan
Date:     2007-03-11 12:35:55 -0700 (Sun, 11 Mar 2007)
Log Message:
-----------
more efficient implementation
Modified Paths:
--------------
    trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	2007-03-08 19:48:41 UTC (rev 1600)
+++ trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	2007-03-11 19:35:55 UTC (rev 1601)
@@ -6,11 +6,12 @@
 
 package org.globus.cog.abstraction.impl.execution.local;
 
-import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileWriter;
-import java.io.InputStreamReader;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Iterator;
@@ -31,231 +32,233 @@
  * @author Kaizar Amin (am...@mc...)
  * 
  */
-public class JobSubmissionTaskHandler implements DelegatedTaskHandler, Runnable {
-	private static Logger logger = Logger.getLogger(JobSubmissionTaskHandler.class);
+public class JobSubmissionTaskHandler implements DelegatedTaskHandler,
+        Runnable {
+    private static Logger logger = Logger
+            .getLogger(JobSubmissionTaskHandler.class);
 
-	private Task task = null;
-	private Thread thread = null;
-	private Process process;
-	private volatile boolean killed;
+    public static final int BUFFER_SIZE = 1024;
 
-	public void submit(Task task) throws IllegalSpecException, InvalidSecurityContextException,
-			InvalidServiceContactException, TaskSubmissionException {
-		if (this.task != null) {
-			throw new TaskSubmissionException(
-					"JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
-		}
-		else {
-			this.task = task;
-			JobSpecification spec;
-			try {
-				spec = (JobSpecification) this.task.getSpecification();
-			}
-			catch (Exception e) {
-				throw new IllegalSpecException("Exception while retreiving Job Specification", e);
-			}
-			
-			if (logger.isDebugEnabled()) {
-				logger.debug(spec.toString());
-			}
+    private Task task = null;
+    private Thread thread = null;
+    private Process process;
+    private volatile boolean killed;
 
-			try {
-				this.thread = new Thread(this);
-				// check if the task has not been canceled after it was
-				// submitted for execution
-				if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
-					this.task.setStatus(Status.SUBMITTED);
-					this.thread.start();
-					if (spec.isBatchJob()) {
-						this.task.setStatus(Status.COMPLETED);
-					}
-				}
-			}
-			catch (Exception e) {
-				Status newStatus = new StatusImpl();
-				Status oldStatus = this.task.getStatus();
-				newStatus.setPrevStatusCode(oldStatus.getStatusCode());
-				newStatus.setStatusCode(Status.FAILED);
-				newStatus.setException(e);
-				this.task.setStatus(newStatus);
-				throw new TaskSubmissionException("Cannot submit job", e);
-			}
-		}
-	}
+    public void submit(Task task) throws IllegalSpecException,
+            InvalidSecurityContextException, InvalidServiceContactException,
+            TaskSubmissionException {
+        if (this.task != null) {
+            throw new TaskSubmissionException(
+                    "JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
+        } else {
+            this.task = task;
+            JobSpecification spec;
+            try {
+                spec = (JobSpecification) this.task.getSpecification();
+            } catch (Exception e) {
+                throw new IllegalSpecException(
+                        "Exception while retreiving Job Specification", e);
+            }
 
-	public void suspend() throws InvalidSecurityContextException, TaskSubmissionException {
-	}
+            if (logger.isDebugEnabled()) {
+                logger.debug(spec.toString());
+            }
 
-	public void resume() throws InvalidSecurityContextException, TaskSubmissionException {
-	}
+            try {
+                if (logger.isInfoEnabled()) {
+                    logger.info("Submitting task " + task);
+                }
+                this.thread = new Thread(this);
+                // check if the task has not been canceled after it was
+                // submitted for execution
+                if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
+                    this.task.setStatus(Status.SUBMITTED);
+                    this.thread.start();
+                    if (spec.isBatchJob()) {
+                        this.task.setStatus(Status.COMPLETED);
+                    }
+                }
+            } catch (Exception e) {
+                Status newStatus = new StatusImpl();
+                Status oldStatus = this.task.getStatus();
+                newStatus.setPrevStatusCode(oldStatus.getStatusCode());
+                newStatus.setStatusCode(Status.FAILED);
+                newStatus.setException(e);
+                this.task.setStatus(newStatus);
+                throw new TaskSubmissionException("Cannot submit job", e);
+            }
+        }
+    }
 
-	public void cancel() throws InvalidSecurityContextException, TaskSubmissionException {
-		killed = true;
-		process.destroy();
-		this.task.setStatus(Status.CANCELED);
-	}
+    public void suspend() throws InvalidSecurityContextException,
+            TaskSubmissionException {
+    }
 
-	public void run() {
-		try {
-			// TODO move away from the multi-threaded approach
-			JobSpecification spec = (JobSpecification) this.task.getSpecification();
+    public void resume() throws InvalidSecurityContextException,
+            TaskSubmissionException {
+    }
 
-			File dir = null;
-			if (spec.getDirectory() != null) {
-				dir = new File(spec.getDirectory());
-			}
+    public void cancel() throws InvalidSecurityContextException,
+            TaskSubmissionException {
+        killed = true;
+        process.destroy();
+        this.task.setStatus(Status.CANCELED);
+    }
 
-			process = Runtime.getRuntime().exec(buildCmdArray(spec), buildEnvp(spec), dir);
+    public void run() {
+        try {
+            // TODO move away from the multi-threaded approach
+            JobSpecification spec = (JobSpecification) this.task
+                    .getSpecification();
 
-			if (spec.getStdInput() != null) {
-				OutputStream out = process.getOutputStream();
-				
-				File stdin;
-				if (dir != null) {
-					stdin = new File(dir, spec.getStdInput());
-				}
-				else {
-					stdin = new File(spec.getStdInput());
-				}
+            File dir = null;
+            if (spec.getDirectory() != null) {
+                dir = new File(spec.getDirectory());
+            }
 
-				FileInputStream file = new FileInputStream(stdin);
-				InputStreamReader inReader = new InputStreamReader(file);
-				BufferedReader inBuffer = new BufferedReader(inReader);
-				String message = inBuffer.readLine();
-				while (message != null) {
-					out.write(message.getBytes());
-					message = inBuffer.readLine();
-				}
-				inBuffer.close();
-			}
+            process = Runtime.getRuntime().exec(buildCmdArray(spec),
+                    buildEnvp(spec), dir);
+            this.task.setStatus(Status.ACTIVE);
 
-			// process output
-			InputStreamReader inReader = new InputStreamReader(process.getInputStream());
-			BufferedReader inBuffer = new BufferedReader(inReader);
-			String message = inBuffer.readLine();
-			String output = message;
-			while (message != null) {
-				message = inBuffer.readLine();
-				if (message != null) {
-					output += message + "\n";
-				}
-			}
-			if (spec.getStdOutput() == null) {
-				// redirect output to the stdOutput of task
-				this.task.setStdOutput(output);
-				logger.debug("STDOUT from job: " + output);
-			}
-			else {
-				// redirect it to the specified file
-				File stdout;
-				if (dir != null) {
-					stdout = new File(dir, spec.getStdOutput());
-				}
-				else {
-					stdout = new File(spec.getStdOutput());
-				}
-				FileWriter writer = new FileWriter(stdout);
-				if (output != null) {
-					writer.write(output);
-					writer.flush();
-				}
-				writer.close();
-			}
+            // reusable byte buffer
+            byte[] buf = new byte[BUFFER_SIZE];
+            /*
+             * This should be interleaved with stdout processing, since a
+             * process could block if its STDOUT is not consumed, thus causing a
+             * deadlock
+             */
+            processIN(spec.getStdInput(), dir, buf);
 
-			// process error
-			inReader = new InputStreamReader(process.getErrorStream());
-			inBuffer = new BufferedReader(inReader);
-			message = inBuffer.readLine();
-			output = message;
-			while (message != null) {
-				message = inBuffer.readLine();
-				if (message != null) {
-					output += message + "\n";
-				}
-			}
-			if (spec.getStdError() == null) {
-				// redirect output to the stdError of task
-				this.task.setStdError(output);
-				logger.debug("STDERR from job: " + output);
-			}
-			else {
-				// redirect it to the specified file
-				File stderr;
-				if (dir != null) {
-					stderr = new File(dir, spec.getStdError());
-				}
-				else {
-					stderr = new File(spec.getStdError());
-				}
-				FileWriter writer = new FileWriter(stderr);
-				if (output != null) {
-					writer.write(output);
-					writer.flush();
-				}
-				writer.close();
-			}
+            if (spec.getStdOutput() != null || spec.isRedirected()) {
+                String out = processOUT(spec.getStdOutput(), dir, buf,
+                        process.getInputStream());
+                if (out != null) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("STDOUT from job: " + out);
+                    }
+                    this.task.setStdOutput(out);
+                }
+            }
 
-			if (spec.isBatchJob()) {
-				return;
-			}
+            if (spec.getStdError() != null || spec.isRedirected()) {
+                String err = processOUT(spec.getStdError(), dir, buf, process
+                        .getErrorStream());
+                if (err != null) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("STDERR from job: " + err);
+                    }
+                    this.task.setStdError(err);
+                }
+            }
 
-			int exitCode = process.waitFor();
-			logger.debug("Exit code was " + exitCode);
-			if (killed) {
-				return;
-			}
-			if (exitCode == 0) {
-				this.task.setStatus(Status.COMPLETED);
-			}
-			else {
-				throw new Exception("Job failed with an exit code of " + exitCode + "\n" + output);
-			}
-		}
-		catch (Exception e) {
-			if (killed) {
-				return;
-			}
-			logger.debug("Exception while running local executable", e);
-			Status newStatus = new StatusImpl();
-			Status oldStatus = this.task.getStatus();
-			newStatus.setPrevStatusCode(oldStatus.getStatusCode());
-			newStatus.setStatusCode(Status.FAILED);
-			newStatus.setException(e);
-			this.task.setStatus(newStatus);
-		}
-	}
+            if (spec.isBatchJob()) {
+                return;
+            }
 
-	private String[] buildCmdArray(JobSpecification spec) {
-		List arguments = spec.getArgumentsAsList();
-		String[] cmdarray = new String[arguments.size() + 1];
+            int exitCode = process.waitFor();
+            if (logger.isDebugEnabled()) {
+                logger.debug("Exit code was " + exitCode);
+            }
+            if (killed) {
+                return;
+            }
+            if (exitCode == 0) {
+                this.task.setStatus(Status.COMPLETED);
+            } else {
+                throw new Exception("Job failed with an exit code of "
+                        + exitCode);
+            }
+        } catch (Exception e) {
+            if (killed) {
+                return;
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Exception while running local executable", e);
+            }
+            Status newStatus = new StatusImpl();
+            Status oldStatus = this.task.getStatus();
+            newStatus.setPrevStatusCode(oldStatus.getStatusCode());
+            newStatus.setStatusCode(Status.FAILED);
+            newStatus.setException(e);
+            this.task.setStatus(newStatus);
+        }
+    }
 
-		cmdarray[0] = spec.getExecutable();
-		Iterator i = arguments.iterator();
-		int index = 1;
-		while (i.hasNext()) {
-			cmdarray[index++] = (String) i.next();
-		}
-		return cmdarray;
-	}
+    protected void processIN(String in, File dir, byte[] buf)
+            throws IOException {
+        if (in != null) {
+            OutputStream out = process.getOutputStream();
 
-	private String[] buildEnvp(JobSpecification spec) {
-		Collection names = spec.getEnvironmentVariableNames();
-		if (names.size() == 0) {
-			/*
-			 * Questionable. An envp of null will cause the parent environment
-			 * to be inherited, while an empty one will cause no environment
-			 * variables to be set for the process. Or so it seems from the
-			 * Runtime.exec docs.
-			 */
-			return null;
-		}
-		String[] envp = new String[names.size()];
-		Iterator i = names.iterator();
-		int index = 0;
-		while (i.hasNext()) {
-			String name = (String) i.next();
-			envp[index++] = name + "=" + spec.getEnvironmentVariable(name);
-		}
-		return envp;
-	}
-}
\ No newline at end of file
+            File stdin;
+            if (dir != null) {
+                stdin = new File(dir, in);
+            } else {
+                stdin = new File(in);
+            }
+
+            FileInputStream file = new FileInputStream(stdin);
+            int read = file.read(buf);
+            while (read != -1) {
+                out.write(buf, 0, read);
+                read = file.read(buf);
+            }
+            file.close();
+        }
+    }
+
+    protected String processOUT(String out, File dir, byte[] buf,
+            InputStream pin) throws IOException {
+
+        OutputStream os;
+        if (out == null) {
+            os = new ByteArrayOutputStream();
+        } else {
+            os = new FileOutputStream(out);
+        }
+        int len = pin.read(buf);
+        while (len != -1) {
+            os.write(buf, 0, len);
+            len = pin.read(buf);
+        }
+        os.close();
+        if (out == null) {
+            return os.toString();
+        } else {
+            return null;
+        }
+    }
+
+    private String[] buildCmdArray(JobSpecification spec) {
+        List arguments = spec.getArgumentsAsList();
+        String[] cmdarray = new String[arguments.size() + 1];
+
+        cmdarray[0] = spec.getExecutable();
+        Iterator i = arguments.iterator();
+        int index = 1;
+        while (i.hasNext()) {
+            cmdarray[index++] = (String) i.next();
+        }
+        return cmdarray;
+    }
+
+    private String[] buildEnvp(JobSpecification spec) {
+        Collection names = spec.getEnvironmentVariableNames();
+        if (names.size() == 0) {
+            /*
+             * Questionable. An envp of null will cause the parent environment
+             * to be inherited, while an empty one will cause no environment
+             * variables to be set for the process. Or so it seems from the
+             * Runtime.exec docs.
+             */
+            return null;
+        }
+        String[] envp = new String[names.size()];
+        Iterator i = names.iterator();
+        int index = 0;
+        while (i.hasNext()) {
+            String name = (String) i.next();
+            envp[index++] = name + "=" + spec.getEnvironmentVariable(name);
+        }
+        return envp;
+    }
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
      
      
      From: <ha...@us...> - 2007-09-19 20:43:49
       | 
| Revision: 1751
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1751&view=rev
Author:   hategan
Date:     2007-09-19 13:43:48 -0700 (Wed, 19 Sep 2007)
Log Message:
-----------
don't set status if exception is thrown
Modified Paths:
--------------
    trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	2007-09-19 20:43:23 UTC (rev 1750)
+++ trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	2007-09-19 20:43:48 UTC (rev 1751)
@@ -83,12 +83,6 @@
                     }
                 }
             } catch (Exception e) {
-                Status newStatus = new StatusImpl();
-                Status oldStatus = this.task.getStatus();
-                newStatus.setPrevStatusCode(oldStatus.getStatusCode());
-                newStatus.setStatusCode(Status.FAILED);
-                newStatus.setException(e);
-                this.task.setStatus(newStatus);
                 throw new TaskSubmissionException("Cannot submit job", e);
             }
         }
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
      
      
      From: <ha...@us...> - 2008-02-12 17:29:15
       | 
| Revision: 1892
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1892&view=rev
Author:   hategan
Date:     2008-02-12 09:29:05 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
fixed cancelling and weird status checks: if you want to see if it was canceled then don't check if it's unsubmitted; those are two different things
Modified Paths:
--------------
    trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	2008-02-12 17:27:23 UTC (rev 1891)
+++ trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	2008-02-12 17:29:05 UTC (rev 1892)
@@ -54,8 +54,13 @@
         if (this.task != null) {
             throw new TaskSubmissionException(
                     "JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
-        } else {
+        }
+        else if (this.task.getStatus().getStatusCode() != Status.UNSUBMITTED) {
+            throw new TaskSubmissionException("Task is not in unsubmitted state");
+        }
+        else {
             this.task = task;
+            task.setStatus(Status.SUBMITTING);
             JobSpecification spec;
             try {
                 spec = (JobSpecification) this.task.getSpecification();
@@ -72,14 +77,14 @@
                 if (logger.isInfoEnabled()) {
                     logger.info("Submitting task " + task);
                 }
-                this.thread = new Thread(this);
-                // check if the task has not been canceled after it was
-                // submitted for execution
-                if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
-                    this.task.setStatus(Status.SUBMITTED);
-                    this.thread.start();
-                    if (spec.isBatchJob()) {
-                        this.task.setStatus(Status.COMPLETED);
+                synchronized(this) {
+                    this.thread = new Thread(this);
+                    if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
+                        this.task.setStatus(Status.SUBMITTED);
+                        this.thread.start();
+                        if (spec.isBatchJob()) {
+                            this.task.setStatus(Status.COMPLETED);
+                        }
                     }
                 }
             } catch (Exception e) {
@@ -98,9 +103,11 @@
 
     public void cancel() throws InvalidSecurityContextException,
             TaskSubmissionException {
-        killed = true;
-        process.destroy();
-        this.task.setStatus(Status.CANCELED);
+        synchronized(this) {
+            killed = true;
+            process.destroy();
+            this.task.setStatus(Status.CANCELED);
+        }
     }
 
     private static final FileLocation REDIRECT_LOCATION = FileLocation.MEMORY
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
      
      
      From: <ha...@us...> - 2008-02-12 20:20:11
       | 
| Revision: 1901
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1901&view=rev
Author:   hategan
Date:     2008-02-12 12:20:02 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
fixed npe
Modified Paths:
--------------
    trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	2008-02-12 17:58:19 UTC (rev 1900)
+++ trunk/current/src/cog/modules/provider-local/src/org/globus/cog/abstraction/impl/execution/local/JobSubmissionTaskHandler.java	2008-02-12 20:20:02 UTC (rev 1901)
@@ -55,7 +55,7 @@
             throw new TaskSubmissionException(
                     "JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
         }
-        else if (this.task.getStatus().getStatusCode() != Status.UNSUBMITTED) {
+        else if (task.getStatus().getStatusCode() != Status.UNSUBMITTED) {
             throw new TaskSubmissionException("Task is not in unsubmitted state");
         }
         else {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 |