|
From: <ha...@us...> - 2007-02-12 23:55:08
|
Revision: 1575
http://svn.sourceforge.net/cogkit/?rev=1575&view=rev
Author: hategan
Date: 2007-02-12 15:55:03 -0800 (Mon, 12 Feb 2007)
Log Message:
-----------
added providers for PBS and Cobalt
Added Paths:
-----------
trunk/current/src/cog/modules/provider-localscheduler/
trunk/current/src/cog/modules/provider-localscheduler/CHANGES.txt
trunk/current/src/cog/modules/provider-localscheduler/build.xml
trunk/current/src/cog/modules/provider-localscheduler/dependencies.xml
trunk/current/src/cog/modules/provider-localscheduler/etc/
trunk/current/src/cog/modules/provider-localscheduler/etc/MANIFEST.MF.head
trunk/current/src/cog/modules/provider-localscheduler/etc/MANIFEST.MF.tail
trunk/current/src/cog/modules/provider-localscheduler/etc/log4j.properties.module
trunk/current/src/cog/modules/provider-localscheduler/etc/provider-cobalt.properties
trunk/current/src/cog/modules/provider-localscheduler/etc/provider-pbs.properties
trunk/current/src/cog/modules/provider-localscheduler/launchers.xml
trunk/current/src/cog/modules/provider-localscheduler/lib/
trunk/current/src/cog/modules/provider-localscheduler/project.properties
trunk/current/src/cog/modules/provider-localscheduler/resources/
trunk/current/src/cog/modules/provider-localscheduler/resources/cog-provider.properties
trunk/current/src/cog/modules/provider-localscheduler/src/
trunk/current/src/cog/modules/provider-localscheduler/src/org/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltExecutor.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/Properties.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/ProcessException.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/ProcessListener.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/PBSExecutor.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/Properties.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/TaskHandlerImpl.java
Added: trunk/current/src/cog/modules/provider-localscheduler/CHANGES.txt
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/CHANGES.txt (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/CHANGES.txt 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,3 @@
+(02/12/07)
+
+*** Initial version
\ No newline at end of file
Added: trunk/current/src/cog/modules/provider-localscheduler/build.xml
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/build.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/build.xml 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,152 @@
+<project name="Java CoG Kit" default="dist" basedir=".">
+
+ <property file="project.properties"/>
+ <property name="cog.dir" value="${basedir}/../../"/>
+ <property name="main.buildfile" value="${cog.dir}/mbuild.xml"/>
+ <property name="dist.dir" value="${cog.dir}/modules/${module.name}/dist/${module.name}-${version}"/>
+ <property name="build.dir" value="${cog.dir}/modules/${module.name}/build"/>
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Help | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="help">
+ <echo>
+ Available targets:
+ help:
+ prints out this help message
+
+ dist:
+ creates a distribution directory of the
+ ${project} ${long.name}
+
+ jar:
+ creates a jar file for the ${project} ${long.name}
+ named ${jar.filename}
+
+ javadoc:
+ creates the documentation
+
+ clean:
+ removes the compiled classes
+
+ distclean:
+ deletes the distribution directory
+
+ all:
+ dist and javadoc
+
+ deploy.webstart:
+ deploys the module as a webstart application
+
+ dist.joint:
+ builds everything into one jar file. Should only
+ be used globally (from all)
+
+ fixeol:
+ change newlines to the unix standard
+ </echo>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Dist | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="dist" unless="no.providers">
+ <ant antfile="${main.buildfile}" target="dist"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Compile | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="compile">
+ <ant antfile="${main.buildfile}" target="compile"/>
+ </target>
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Clean | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="clean">
+ <ant antfile="${main.buildfile}" target="clean"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Distclean | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="distclean">
+ <ant antfile="${main.buildfile}" target="distclean"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Jar | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="jar">
+ <ant antfile="${main.buildfile}" target="jar"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Javadoc | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="javadoc">
+ <ant antfile="${main.buildfile}" target="javadoc"/>
+ </target>
+
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | PMD | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="pmd">
+ <ant antfile="${main.buildfile}" target="pmd"/>
+ </target>
+
+ <target name="deploy.webstart">
+ <ant antfile="${main.buildfile}" target="deploy.webstart"/>
+ </target>
+
+ <target name="replacelibs">
+ <ant antfile="${main.buildfile}" target="replacelibs"/>
+ </target>
+
+ <target name="webstart.launchers">
+ <ant antfile="${main.buildfile}" target="webstart.launchers"/>
+ </target>
+
+ <target name="dist.joint">
+ <ant antfile="${main.buildfile}" target="dist.all"/>
+ </target>
+
+ <target name="module.package">
+ <ant antfile="${main.buildfile}" target="module.package"/>
+ </target>
+
+ <!-- ================================================ -->
+ <!-- fixeol -->
+ <!-- ================================================ -->
+
+ <target name="fixeol">
+ <ant antfile="${main.buildfile}" target="fixeol"/>
+ </target>
+
+</project>
+
+
Added: trunk/current/src/cog/modules/provider-localscheduler/dependencies.xml
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/dependencies.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/dependencies.xml 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,14 @@
+<project name="Project dependencies" default="deps" basedir=".">
+ <!-- project dependencies -->
+ <target name="deps">
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="abstraction-common"/>
+ </ant>
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="jglobus"/>
+ </ant>
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="util"/>
+ </ant>
+ </target>
+</project>
Added: trunk/current/src/cog/modules/provider-localscheduler/etc/MANIFEST.MF.head
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/etc/MANIFEST.MF.head (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/etc/MANIFEST.MF.head 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1 @@
+Manifest-Version: 1.0
Added: trunk/current/src/cog/modules/provider-localscheduler/etc/MANIFEST.MF.tail
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/etc/MANIFEST.MF.tail (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/etc/MANIFEST.MF.tail 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1 @@
+
Added: trunk/current/src/cog/modules/provider-localscheduler/etc/log4j.properties.module
===================================================================
Added: trunk/current/src/cog/modules/provider-localscheduler/etc/provider-cobalt.properties
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/etc/provider-cobalt.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/etc/provider-cobalt.properties 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,16 @@
+#
+# The interval, in seconds, at which the provider will poll the Cobalt
+# queue for status updates. There is at most one poll thread per JVM,
+# which is shared by all the jobs submitted through the Cobalt provider.
+#
+poll.interval=5
+
+#
+# The path to cqsub. The default assumes that cqsub is in PATH
+#
+cqsub=cqsub
+
+#
+# The path to cqstat. The default assumes that cqstat is in PATH
+#
+cqstat=cqstat
Added: trunk/current/src/cog/modules/provider-localscheduler/etc/provider-pbs.properties
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/etc/provider-pbs.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/etc/provider-pbs.properties 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,16 @@
+#
+# The interval, in seconds, at which the provider will poll the PBS
+# queue for status updates. There is at most one poll thread per JVM,
+# which is shared by all the jobs submitted through the PBS provider.
+#
+poll.interval=5
+
+#
+# The path to qsub. The default assumes that qsub is in PATH
+#
+qsub=qsub
+
+#
+# The path to qstat. The default assumes that qstat is in PATH
+#
+qstat=qstat
Added: trunk/current/src/cog/modules/provider-localscheduler/launchers.xml
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/launchers.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/launchers.xml 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,6 @@
+<project name="Launchers" default="create" basedir=".">
+ <target name="create">
+ </target>
+ <target name="webstart">
+ </target>
+</project>
Added: trunk/current/src/cog/modules/provider-localscheduler/project.properties
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/project.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/project.properties 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,7 @@
+module.name = provider-localscheduler
+long.name = Local providers for PBS/Torque and Cobalt
+version = 1.0
+project = Java CoG Kit
+lib.deps = -
+debug = true
+
Added: trunk/current/src/cog/modules/provider-localscheduler/resources/cog-provider.properties
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/resources/cog-provider.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/resources/cog-provider.properties 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,13 @@
+provider=pbs
+sandbox=false
+executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.pbs.execution.TaskHandlerImpl
+securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
+
+alias=pbslocal:pbs
+
+provider=cobalt
+sandbox=false
+executionTaskHandler=org.globus.cog.abstraction.impl.scheduler.cobalt.execution.TaskHandlerImpl
+securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
+
+alias=cobaltlocal:cobalt
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltExecutor.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltExecutor.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/CobaltExecutor.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,286 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.cobalt;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.gsi.gssapi.auth.AuthorizationException;
+import org.ietf.jgss.GSSException;
+
+public class CobaltExecutor implements ProcessListener {
+ public static final Logger logger = Logger.getLogger(CobaltExecutor.class);
+
+ private JobSpecification spec;
+ private Task task;
+ private static QueuePoller poller;
+ private ProcessListener listener;
+ private String stdout, stderr, exitcode;
+ private File script;
+ private String cqsub;
+
+ private static final String[] EMPTY_STRING_ARRAY = new String[0];
+
+ public CobaltExecutor(Task task, ProcessListener listener) {
+ this.task = task;
+ this.spec = (JobSpecification) task.getSpecification();
+ this.listener = listener;
+ this.cqsub = Properties.getProperties().getCQSub();
+ }
+
+ private static synchronized QueuePoller getProcessPoller() {
+ if (poller == null) {
+ poller = new QueuePoller();
+ poller.start();
+ }
+ return poller;
+ }
+
+ public void start() throws AuthorizationException, GSSException,
+ IOException, ProcessException {
+ File scriptdir = new File(System.getProperty("user.home")
+ + File.separatorChar + ".globus" + File.separatorChar
+ + "scripts");
+ scriptdir.mkdirs();
+ if (!scriptdir.exists()) {
+ throw new IOException("Failed to create script directory ("
+ + scriptdir + ")");
+ }
+ script = File.createTempFile("cobalt", ".sh", scriptdir);
+ stdout = spec.getStdOutput() == null ? script.getAbsolutePath()
+ + ".stdout" : spec.getStdOutput();
+ stderr = spec.getStdError() == null ? script.getAbsolutePath()
+ + ".stderr" : spec.getStdError();
+ exitcode = script.getAbsolutePath() + ".exitcode";
+ writeScript(new BufferedWriter(new FileWriter(script)), exitcode,
+ stdout, stderr);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Wrote submit script to " + script);
+ }
+ String[] cmdline = buildCMDLine(script.getAbsolutePath());
+
+ Process process = Runtime.getRuntime().exec(cmdline, null, null);
+
+ try {
+ process.getOutputStream().close();
+ }
+ catch (IOException e) {
+ }
+
+ try {
+ int code = process.waitFor();
+ if (code != 0) {
+ throw new ProcessException(
+ "Could not submit job (qsub reported an exit code of "
+ + code + "). "
+ + getOutput(process.getErrorStream()));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("QSub done (exit code " + code + ")");
+ }
+ }
+ catch (InterruptedException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Interrupted exception while waiting for qsub", e);
+ }
+ if (listener != null) {
+ listener
+ .processFailed("The submission process was interrupted");
+ }
+ }
+
+ String jobid = getOutput(process.getInputStream());
+
+ getProcessPoller().addJob(
+ new Job(jobid, spec.isRedirected() ? stdout : null, spec
+ .isRedirected() ? stderr : null, exitcode, this));
+ }
+
+ private void error(String message) {
+ listener.processFailed(message);
+ }
+
+ protected void writeScript(Writer wr, String exitcodefile, String stdout,
+ String stderr) throws IOException {
+ if (spec.getStdInput() != null) {
+ throw new IOException("The Cobalt provider cannot redirect STDIN");
+ }
+ Iterator i = spec.getEnvironmentVariableNames().iterator();
+ while (i.hasNext()) {
+ String name = (String) i.next();
+ wr.write(name);
+ wr.write('=');
+ wr.write(quote(spec.getEnvironmentVariable(name)));
+ wr.write('\n');
+ }
+ wr.write(quote(spec.getExecutable()));
+ List args = spec.getArgumentsAsList();
+ if (args != null && args.size() > 0) {
+ wr.write(' ');
+ i = args.iterator();
+ while (i.hasNext()) {
+ wr.write(quote((String) i.next()));
+ wr.write(' ');
+ }
+ }
+ wr.write(" 1>" + quote(stdout) + ' ');
+ wr.write(" 2>" + quote(stderr) + '\n');
+ wr.write("/bin/echo $? >" + exitcodefile + '\n');
+ wr.close();
+ }
+
+ protected void addAttr(String attrName, String option, List l) {
+ Object value = spec.getAttribute(attrName);
+ if (value != null) {
+ l.add(option);
+ l.add(String.valueOf(value));
+ }
+ }
+
+ protected String[] buildCMDLine(String script) {
+ List l = new ArrayList();
+ l.add(cqsub);
+ addAttr("queue", "-q", l);
+ Collection names = spec.getEnvironmentVariableNames();
+ if (names != null && names.size() > 0) {
+ l.add("-e");
+ StringBuffer sb = new StringBuffer();
+ Iterator i = names.iterator();
+ while (i.hasNext()) {
+ String name = (String) i.next();
+ sb.append(name);
+ sb.append('=');
+ sb.append(quote(spec.getEnvironmentVariable(name)));
+ if (i.hasNext()) {
+ sb.append(':');
+ }
+ }
+ l.add(sb.toString());
+ }
+ addAttr("mode", "-m", l);
+ //We're gonna treat this as the process count
+ addAttr("count", "-c", l);
+ addAttr("project", "-p", l);
+ addAttr("queue", "-q", l);
+ addAttr("maxwalltime", "-t", l);
+ if (spec.getDirectory() != null) {
+ l.add("-C");
+ l.add(spec.getDirectory());
+ }
+ l.add("/bin/sh");
+ l.add(script);
+ return (String[]) l.toArray(EMPTY_STRING_ARRAY);
+ }
+
+ protected String quote(String s) {
+ boolean quotes = false;
+ if (s.indexOf(' ') != -1) {
+ quotes = true;
+ }
+ StringBuffer sb = new StringBuffer();
+ if (quotes) {
+ sb.append('"');
+ }
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c == '"' || c == '\\') {
+ sb.append('\\');
+ break;
+ }
+ sb.append(c);
+ }
+ if (quotes) {
+ sb.append('"');
+ }
+ return sb.toString();
+ }
+
+ protected String getOutput(InputStream is) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for output from qsub");
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ StringBuffer sb = new StringBuffer();
+ String out = br.readLine();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Output from qsub is: \"" + out + "\"");
+ }
+ if ("".equals(out)) {
+ throw new IOException("Qsub returned empty job ID");
+ }
+ return out;
+ }
+
+ protected void cleanup() {
+ script.delete();
+ new File(exitcode).delete();
+ if (spec.getStdOutput() == null && stdout != null) {
+ new File(stdout).delete();
+ }
+ if (spec.getStdError() == null && stderr != null) {
+ new File(stderr).delete();
+ }
+ }
+
+ public void processCompleted(int exitCode) {
+ cleanup();
+ if (listener != null) {
+ listener.processCompleted(exitCode);
+ }
+ }
+
+ public void processFailed(String message) {
+ cleanup();
+ if (listener != null) {
+ listener.processFailed(message);
+ }
+ }
+
+ public void statusChanged(int status) {
+ if (listener != null) {
+ listener.statusChanged(status);
+ }
+ }
+
+ public void stderrUpdated(String stderr) {
+ if (listener != null) {
+ listener.stderrUpdated(stderr);
+ }
+ }
+
+ public void stdoutUpdated(String stdout) {
+ if (listener != null) {
+ listener.stdoutUpdated(stdout);
+ }
+ }
+
+ public void processFailed(Exception e) {
+ cleanup();
+ if (listener != null) {
+ listener.processFailed(e);
+ }
+ }
+}
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/Properties.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/Properties.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/Properties.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,80 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 20, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.cobalt;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.log4j.Logger;
+
+public class Properties extends java.util.Properties {
+ private static Logger logger = Logger.getLogger(Properties.class);
+
+ public static final String PROPERTIES = "provider-cobalt.properties";
+
+ public static final String POLL_INTERVAL = "poll.interval";
+ public static final String CQSUB = "cqsub";
+ public static final String CQSTAT = "cqstat";
+
+ private static Properties properties;
+
+ public static synchronized Properties getProperties() {
+ if (properties == null) {
+ properties = new Properties();
+ properties.load();
+ }
+ return properties;
+ }
+
+ private void load() {
+ setDefaults();
+ InputStream is = getClass().getClassLoader().getResourceAsStream(PROPERTIES);
+ if (is == null) {
+ logger.warn("Could not find " + PROPERTIES + ". Using defaults.");
+ }
+ else {
+ try {
+ super.load(is);
+ }
+ catch (IOException e) {
+ }
+ }
+ }
+
+ private void setDefaults() {
+ setPollInterval(5);
+ setCQSub("cqsub");
+ setCQStat("cqstat");
+ }
+
+ public void setPollInterval(int value) {
+ setProperty(POLL_INTERVAL, String.valueOf(value));
+ }
+
+ public int getPollInterval() {
+ return Integer.parseInt(getProperty(POLL_INTERVAL));
+ }
+
+ public void setCQSub(String cqsub) {
+ setProperty(CQSUB, cqsub);
+ }
+
+ public String getCQSub() {
+ return getProperty(CQSUB);
+ }
+
+ public void setCQStat(String cqstat) {
+ setProperty(CQSTAT, cqstat);
+ }
+
+ public String getCQStat() {
+ return getProperty(CQSTAT);
+ }
+}
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/QueuePoller.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,224 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.cobalt;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+
+public class QueuePoller extends Thread {
+ public static final Logger logger = Logger.getLogger(QueuePoller.class);
+
+ private LinkedList newjobs, donejobs;
+ private Map jobs;
+ boolean any = false;
+ private int sleepTime;
+
+ public QueuePoller() {
+ setName("Cobalt-Local provider stream poller");
+ setDaemon(true);
+ jobs = new HashMap();
+ newjobs = new LinkedList();
+ donejobs = new LinkedList();
+ sleepTime = Properties.getProperties().getPollInterval() * 1000;
+ }
+
+ public void addJob(Job job) {
+ synchronized (newjobs) {
+ newjobs.add(job);
+ }
+ }
+
+ public void run() {
+ boolean empty;
+ while (true) {
+ while (jobs.size() + newjobs.size() == 0) {
+ try {
+ Thread.sleep(250);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ Exception exc = null;
+ pollQueue();
+ if (logger.isInfoEnabled()) {
+ logger.info("Active: " + jobs.size() + ", New: "
+ + newjobs.size() + ", Done: " + donejobs.size());
+ }
+ removeDoneJobs();
+ commitNewJobs();
+ try {
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+
+ protected void commitNewJobs() {
+ if (newjobs.isEmpty()) {
+ return;
+ }
+ else {
+ synchronized (newjobs) {
+ while (!newjobs.isEmpty()) {
+ Job job = (Job) newjobs.removeFirst();
+ jobs.put(job.getJobID(), job);
+ }
+ }
+ }
+ }
+
+ protected void removeDoneJobs() {
+ if (donejobs.isEmpty()) {
+ return;
+ }
+ else {
+ while (!donejobs.isEmpty()) {
+ String jobid = (String) donejobs.removeFirst();
+ jobs.remove(jobid);
+ }
+ }
+ }
+
+ protected void failAll(Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Fail all ", e);
+ }
+ failAll(String.valueOf(e));
+ }
+
+ protected void failAll(String message) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Fail all: " + message);
+ }
+ Iterator i = jobs.values().iterator();
+ while (i.hasNext()) {
+ Job job = (Job) i.next();
+ try {
+ job.fail(message);
+ }
+ catch (Exception e) {
+ logger.warn("Could not fail job (" + job.getJobID() + ")", e);
+ }
+ }
+ jobs.clear();
+ }
+
+ public static final String[] CMDARRAY = new String[] {
+ Properties.getProperties().getCQStat(), "-f" };
+
+ protected void pollQueue() {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Polling queue");
+ }
+ if (jobs.size() == 0) {
+ return;
+ }
+
+ Process pqstat = Runtime.getRuntime().exec(CMDARRAY);
+ int ec = pqstat.waitFor();
+ if (ec != 0) {
+ failAll("cqstat failed (exit code " + ec + ")");
+ }
+ processStdout(pqstat.getInputStream());
+ processStderr(pqstat.getErrorStream());
+ }
+ catch (Exception e) {
+ failAll(e);
+ }
+ }
+
+ protected String parseToWhitespace(String s, int startindex) {
+ for (int i = startindex; i < s.length(); i++) {
+ if (Character.isWhitespace(s.charAt(i))) {
+ return s.substring(startindex, i);
+ }
+ }
+ return null;
+ }
+
+ private static Set processed = new HashSet();
+
+ protected void processStdout(InputStream is) throws IOException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ String line;
+ String header = br.readLine();
+ if (header == null) {
+ throw new IOException("Failed to read cqstat header");
+ }
+ int jobIDIndex = header.indexOf("JobID");
+ int stateIndex = header.indexOf("State");
+ if (jobIDIndex == -1 || stateIndex == -1) {
+ throw new IOException("Invalid cqstat header: " + header);
+ }
+ // skip the =====...
+ br.readLine();
+ processed.clear();
+ do {
+ line = br.readLine();
+ if (line != null) {
+ String jobid = parseToWhitespace(line, jobIDIndex);
+ String state = parseToWhitespace(line, stateIndex);
+ if (jobid == null || jobid.equals("") || state == null
+ || state.equals("")) {
+ throw new IOException("Failed to parse cqstat line: "
+ + line);
+ }
+ Job job = (Job) jobs.get(jobid);
+ if (job == null) {
+ continue;
+ }
+ processed.add(job);
+ if (state.equals("queued")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + jobid + " is Q");
+ }
+ job.setState(Job.STATE_QUEUED);
+ }
+ else if (state.equals("running")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + jobid + " is R");
+ }
+ job.setState(Job.STATE_RUNNING);
+ }
+ }
+ } while (line != null);
+ Iterator i = jobs.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ String id = (String) e.getKey();
+ if (!processed.contains(id)) {
+ Job job = (Job) e.getValue();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Status for " + id + " is Done");
+ }
+ job.setState(Job.STATE_DONE);
+ if (job.getState() == Job.STATE_DONE) {
+ donejobs.add(id);
+ }
+ }
+ }
+ }
+
+ protected void processStderr(InputStream is) throws IOException {
+ }
+}
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/JobSubmissionTaskHandler.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,145 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.cobalt.execution;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.StatusImpl;
+import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
+import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
+import org.globus.cog.abstraction.impl.common.task.InvalidServiceContactException;
+import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
+import org.globus.cog.abstraction.impl.scheduler.cobalt.CobaltExecutor;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class JobSubmissionTaskHandler implements DelegatedTaskHandler,
+ ProcessListener {
+
+ private static Logger logger = Logger
+ .getLogger(JobSubmissionTaskHandler.class);
+
+ private Task task;
+ private JobSpecification spec;
+ private Thread thread;
+
+ public void submit(Task task) throws IllegalSpecException,
+ InvalidSecurityContextException, InvalidServiceContactException,
+ TaskSubmissionException {
+ if (this.task != null) {
+ throw new TaskSubmissionException(
+ "JobSubmissionTaskHandler cannot handle two active jobs simultaneously");
+ }
+ else {
+ this.task = task;
+ try {
+ spec = (JobSpecification) this.task.getSpecification();
+ }
+ catch (Exception e) {
+ throw new IllegalSpecException(
+ "Exception while retreiving Job Specification", e);
+ }
+
+ if (task.getAllServices() == null
+ || task.getAllServices().size() == 0
+ || task.getService(0) == null) {
+ throw new InvalidSecurityContextException(
+ "No service specified");
+ }
+
+ try {
+ new CobaltExecutor(task, this).start();
+ // check if the task has not been canceled after it was
+ // submitted for execution
+ if (this.task.getStatus().getStatusCode() == Status.UNSUBMITTED) {
+ this.task.setStatus(Status.SUBMITTED);
+ if (spec.isBatchJob()) {
+ this.task.setStatus(Status.COMPLETED);
+ }
+ }
+ }
+ catch (Exception e) {
+ Status newStatus = new StatusImpl();
+ Status oldStatus = this.task.getStatus();
+ newStatus.setPrevStatusCode(oldStatus.getStatusCode());
+ newStatus.setStatusCode(Status.FAILED);
+ newStatus.setException(e);
+ this.task.setStatus(newStatus);
+ if (e.getMessage() != null) {
+ throw new TaskSubmissionException("Cannot submit job: "
+ + e.getMessage(), e);
+ }
+ else {
+ throw new TaskSubmissionException("Cannot submit job", e);
+ }
+ }
+ }
+ }
+
+ public void suspend() throws InvalidSecurityContextException,
+ TaskSubmissionException {
+ }
+
+ public void resume() throws InvalidSecurityContextException,
+ TaskSubmissionException {
+ }
+
+ public void cancel() throws InvalidSecurityContextException,
+ TaskSubmissionException {
+ this.task.setStatus(Status.CANCELED);
+ }
+
+ public void processCompleted(int exitCode) {
+ if (task.getStatus().getStatusCode() != Status.FAILED) {
+ if (exitCode == 0) {
+ task.setStatus(Status.COMPLETED);
+ }
+ else {
+ Status s = new StatusImpl();
+ s.setMessage("Process failed with exit code " + exitCode);
+ s.setStatusCode(Status.FAILED);
+ task.setStatus(s);
+ }
+ }
+ }
+
+ public void processFailed(String message) {
+ Status s = new StatusImpl();
+ s.setMessage(message);
+ s.setStatusCode(Status.FAILED);
+ task.setStatus(s);
+ }
+
+ public void processFailed(Exception e) {
+ Status s = new StatusImpl();
+ s.setMessage(e.getMessage());
+ s.setException(e);
+ s.setStatusCode(Status.FAILED);
+ task.setStatus(s);
+ }
+
+ public void statusChanged(int status) {
+ if (status == Job.STATE_RUNNING) {
+ task.setStatus(Status.ACTIVE);
+ }
+ }
+
+ public void stderrUpdated(String stderr) {
+ if (spec.isRedirected()) {
+ task.setStdError(stderr);
+ }
+ }
+
+ public void stdoutUpdated(String stdout) {
+ if (spec.isRedirected()) {
+ task.setStdOutput(stdout);
+ }
+ }
+}
\ No newline at end of file
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/cobalt/execution/TaskHandlerImpl.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,28 @@
+
+// ----------------------------------------------------------------------
+// This code is developed as part of the Java CoG Kit project
+// The terms of the license can be found at http://www.cogkit.org/license
+// This message may not be removed or altered.
+// ----------------------------------------------------------------------
+
+package org.globus.cog.abstraction.impl.scheduler.cobalt.execution;
+
+import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
+
+/**
+ *Provides a local Cobalt <code>TaskHandler</code>
+ *for job submission to the local resource without
+ *any security context.
+ *
+ */
+public class TaskHandlerImpl extends
+ org.globus.cog.abstraction.impl.common.execution.TaskHandlerImpl {
+
+ protected DelegatedTaskHandler newDelegatedTaskHandler() {
+ return new JobSubmissionTaskHandler();
+ }
+
+ protected String getName() {
+ return "Cobalt";
+ }
+}
\ No newline at end of file
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/Job.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,189 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.common;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+public class Job {
+ public static final Logger logger = Logger.getLogger(Job.class);
+
+ public static final int STATE_NONE = 0;
+ public static final int STATE_QUEUED = 1;
+ public static final int STATE_RUNNING = 2;
+ public static final int STATE_DONE = 3;
+ public static final int STATE_UNKNOWN = 4;
+
+ private String jobID;
+ private String exitcodeFileName;
+ private String stdout, stderr;
+ private ProcessListener listener;
+ private int state;
+ private int ticks;
+
+ public Job(String jobID, String stdout, String stderr,
+ String exitcodeFileName, ProcessListener listener) {
+ this.jobID = jobID;
+ this.listener = listener;
+ this.stdout = stdout;
+ this.stderr = stderr;
+ this.state = STATE_NONE;
+ this.exitcodeFileName = exitcodeFileName;
+ this.ticks = 0;
+ }
+
+ public boolean close() {
+ if (!processStdout() || !processStderr()) {
+ return true;
+ }
+ File f = new File(exitcodeFileName);
+ if (!f.exists()) {
+ if (ticks == 5) {
+ listener
+ .processFailed(new ProcessException(
+ "Exitcode file not found 5 queue polls after the job was reported done"));
+ return true;
+ }
+ else {
+ ticks++;
+ return false;
+ }
+ }
+ else {
+ processExitCode();
+ return true;
+ }
+ }
+
+ protected boolean processExitCode() {
+
+ int exitcode;
+ try {
+ exitcode = Integer.parseInt(new BufferedReader(new FileReader(
+ exitcodeFileName)).readLine());
+ listener.processCompleted(exitcode);
+ return true;
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exception caught while reading exit code", e);
+ }
+ listener.processFailed(new ProcessException(
+ "Exception caught while reading exit code", e));
+ return false;
+ }
+ }
+
+ protected boolean processStdout() {
+ try {
+ if (stdout != null) {
+ String out = readFile(stdout);
+ if (out != null && !"".equals(out)) {
+ listener.stdoutUpdated(out);
+ }
+ }
+ return true;
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exception caught while reading STDOUT", e);
+ }
+ listener.processFailed(new ProcessException(
+ "Exception caught while reading STDOUT", e));
+ return false;
+ }
+ }
+
+ protected boolean processStderr() {
+ try {
+ if (stderr != null) {
+ String err = readFile(stderr);
+ if (err != null && !"".equals(err)) {
+ listener.stderrUpdated(err);
+ }
+ }
+ return true;
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exception caught while reading STDERR", e);
+ }
+ listener.processFailed(new ProcessException(
+ "Exception caught while reading STDERR", e));
+ return false;
+ }
+ }
+
+ public synchronized void await() {
+ while (state != STATE_DONE) {
+ try {
+ wait();
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+
+ public String getJobID() {
+ return jobID;
+ }
+
+ public void setState(int state) {
+ if (state == this.state) {
+ return;
+ }
+ else {
+ if (state == STATE_DONE) {
+ if (close()) {
+ this.state = STATE_DONE;
+ synchronized (this) {
+ notify();
+ }
+ }
+ }
+ else {
+ this.state = state;
+ if (listener != null) {
+ listener.statusChanged(state);
+ }
+ }
+ }
+ }
+
+ public int getState() {
+ return state;
+ }
+
+ public void fail(String message) {
+ listener.processFailed(message);
+ }
+
+ protected String readFile(String name) throws IOException {
+ File f = new File(name);
+ if (!f.exists()) {
+ return null;
+ }
+ BufferedReader br = new BufferedReader(new FileReader(f));
+ StringBuffer sb = new StringBuffer();
+ String line;
+ do {
+ line = br.readLine();
+ if (line != null) {
+ sb.append(line);
+ sb.append('\n');
+ }
+ } while (line != null);
+ return sb.toString();
+ }
+}
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/ProcessException.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/ProcessException.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/ProcessException.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,30 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 19, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.common;
+
+public class ProcessException extends Exception {
+ private static final long serialVersionUID = -3254385200849810535L;
+
+ public ProcessException() {
+ super();
+ }
+
+ public ProcessException(String message) {
+ super(message);
+ }
+
+ public ProcessException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ProcessException(Throwable cause) {
+ super(cause);
+ }
+}
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/ProcessListener.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/ProcessListener.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/common/ProcessListener.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,24 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 15, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.common;
+
+public interface ProcessListener {
+ void processCompleted(int exitCode);
+
+ void processFailed(String message);
+
+ void processFailed(Exception e);
+
+ void stdoutUpdated(String stdout);
+
+ void stderrUpdated(String stderr);
+
+ void statusChanged(int status);
+}
Added: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/PBSExecutor.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/PBSExecutor.java (rev 0)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/PBSExecutor.java 2007-02-12 23:55:03 UTC (rev 1575)
@@ -0,0 +1,258 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 11, 2005
+ */
+package org.globus.cog.abstraction.impl.scheduler.pbs;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.scheduler.common.Job;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
+import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.gsi.gssapi.auth.AuthorizationException;
+import org.ietf.jgss.GSSException;
+
+public class PBSExecutor implements ProcessListener {
+ public static final Logger logger = Logger.getLogger(PBSExecutor.class);
+
+ private JobSpecification spec;
+ private Task task;
+ private static QueuePoller poller;
+ private ProcessListener listener;
+ private String stdout, stderr, exitcode;
+ private File script;
+
+ public PBSExecutor(Task task, ProcessListener listener) {
+ this.task = task;
+ this.spec = (JobSpecification) task.getSpecification();
+ this.listener = listener;
+ }
+
+ private static synchronized QueuePoller getProcessPoller() {
+ if (poller == null) {
+ poller = new QueuePoller();
+ poller.start();
+ }
+ return poller;
+ }
+
+ public void start() throws AuthorizationException, GSSException,
+ IOException, ProcessException {
+ File scriptdir = new File(System.getProperty("user.home")
+ + File.separatorChar + ".globus" + File.separatorChar
+ + "scripts");
+ scriptdir.mkdirs();
+ if (!scriptdir.exists()) {
+ throw new IOException("Failed to create script directory ("
+ + scriptdir + ")");
+ }
+ script = File.createTempFile("pbs", ".qsub", scriptdir);
+ stdout = spec.getStdOutput() == null ? script.getAbsolutePath()
+ + ".stdout" : spec.getStdOutput();
+ stderr = spec.getStdError() == null ? script.getAbsolutePath()
+ + ".stderr" : spec.getStdError();
+ exitcode = script.getAbsolutePath() + ".exitcode";
+ writePBSScript(new BufferedWriter(new FileWriter(script)), exitcode,
+ stdout, stderr);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Wrote PBS script to " + script);
+ }
+
+ Process process = Runtime.getRuntime().exec(
+ new String[] { Properties.getProperties().getQSub(),
+ script.getAbsolutePath() }, null, null);
+
+ try {
+ process.getOutputStream().close();
+ }
+ catch (IOException e) {
+ }
+
+ try {
+ int code = process.waitFor();
+ if (code != 0) {
+ throw new ProcessException(
+ "Could not submit job (qsub reported an exit code of "
+ + code + "). "
+ + getOutput(process.getErrorStream()));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("QSub done (exit code " + code + ")");
+ }
+ }
+ catch (InterruptedException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Interrupted exception while waiting for qsub", e);
+ }
+ if (listener != null) {
+ listener
+ .processFailed("The submission process was interrupted");
+ }
+ }
+
+ String jobid = getOutput(process.getInputStream());
+
+ getProcessPoller().addJob(
+ new Job(jobid, spec.isRedirected() ? stdout : null, spec
+ .isRedirected() ? stderr : null, exitcode, this));
+ }
+
+ private void error(String message) {
+ listener.processFailed(message);
+ }
+
+ protected void writeAttr(String attrName, String arg, Writer wr) throws IOException {
+ Object value = spec.getAttribute(attrName);
+ if (value != null) {
+ wr.write("#PBS "+arg + String.valueOf(value) + '\n');
+ }
+ }
+
+ protected void writePBSScript(Writer wr, String exitcodefile,
+ String stdout, String stderr) throws IOException {
+ wr.write("#PBS -S /bin/sh\n");
+ wr.write("#PBS -N " + task.getName() + '\n');
+ wr.write("#PBS -m n\n");
+ writeAttr("count", "-l nodes=", wr);
+ writeAttr("maxwalltime", "-l walltime=", wr);
+ writeAttr("queue", "-q ", wr);
+ if (spec.getDirectory() != null) {
+ wr.write("#PBS -d " + quote(spec.getDirectory()) + '\n');
+ }
+ if (spec.getStdInput() != null) {
+ throw new IOException("The PBSlocal provider cannot redirect STDIN");
+ }
+ wr.write("#PBS -o " + quote(stdout) + '\n');
+ wr.write("#PBS -e " + quote(stderr) + '\n');
+ Iterator i = spec.getEnvironmentVariableNames().iterator();
+ while (i.hasNext()) {
+ String name = (String) i.next();
+ wr.write(name);
+ wr.write('=');
+ wr.write(quote(spec.getEnvironmentVariable(name)));
+ wr.write('\n');
+ }
+ wr.write(quote(spec.getExecutable()));
+ List args = spec.getArgumentsAsList();
+ if (args != null && args.size() > 0) {
+ wr.write(' ');
+ i = args.iterator();
+ while (i.hasNext()) {
+ wr.write(quote((String) i.next()));
+ if (i.hasNext()) {
+ wr.write(' ');
+ }
+ }
+ }
+ wr.write('\n');
+
+ wr.write("/bin/echo $? >" + exitcodefile + '\n');
+ wr.close();
+ }
+
+ protected String quote(String s) {
+ boolean quotes = false;
+ if (s.indexOf(' ') != -1) {
+ quotes = true;
+ }
+ StringBuffer sb = new StringBuffer();
+ if (quotes) {
+ sb.append('"');
+ }
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c == '"' || c == '\\') {
+ sb.append('\\');
+ break;
+ }
+ sb.append(c);
+ }
+ if (quotes) {
+ sb.append('"');
+ }
+ return sb.toString();
+ }
+
+ protected String getOutput(InputStream is) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.de...
[truncated message content] |