|
From: <ha...@us...> - 2008-04-22 14:05:38
|
Revision: 1967
http://cogkit.svn.sourceforge.net/cogkit/?rev=1967&view=rev
Author: hategan
Date: 2008-04-22 07:05:35 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
added coaster provider
Added Paths:
-----------
trunk/current/src/cog/modules/provider-coaster/
trunk/current/src/cog/modules/provider-coaster/CHANGES.txt
trunk/current/src/cog/modules/provider-coaster/build.xml
trunk/current/src/cog/modules/provider-coaster/dependencies.xml
trunk/current/src/cog/modules/provider-coaster/etc/
trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.head
trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.tail
trunk/current/src/cog/modules/provider-coaster/etc/log4j.properties.module
trunk/current/src/cog/modules/provider-coaster/launchers.xml
trunk/current/src/cog/modules/provider-coaster/lib/
trunk/current/src/cog/modules/provider-coaster/lib/index.html
trunk/current/src/cog/modules/provider-coaster/libexec/
trunk/current/src/cog/modules/provider-coaster/libexec/bootstrap.sh
trunk/current/src/cog/modules/provider-coaster/meta/
trunk/current/src/cog/modules/provider-coaster/meta/description.txt
trunk/current/src/cog/modules/provider-coaster/project.properties
trunk/current/src/cog/modules/provider-coaster/resources/
trunk/current/src/cog/modules/provider-coaster/resources/coaster-bootstrap.list
trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties
trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
trunk/current/src/cog/modules/provider-coaster/src/
trunk/current/src/cog/modules/provider-coaster/src/org/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/Registering.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/RegistrationCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/VersionCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/IDGenerator.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/JobQueue.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/LocalQueueProcessor.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/LocalTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/QueueProcessor.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TCPTest.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WallTime.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerKey.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerTaskMonitor.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalRequestManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/UnregisterHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/VersionHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/BootstrapService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/CancelJobCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/CoasterChannelManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Digester.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/PackageList.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/PortManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/TaskHandlerImpl.java
Added: trunk/current/src/cog/modules/provider-coaster/CHANGES.txt
===================================================================
Added: trunk/current/src/cog/modules/provider-coaster/build.xml
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/build.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/build.xml 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,178 @@
+<project name="Java CoG Kit" default="dist" basedir=".">
+
+ <property file="project.properties"/>
+ <property name="cog.dir" value="${basedir}/../../"/>
+ <property name="main.buildfile" value="${cog.dir}/mbuild.xml"/>
+ <property name="dist.dir" value="${cog.dir}/modules/${module.name}/dist/${module.name}-${version}"/>
+ <property name="build.dir" value="${cog.dir}/modules/${module.name}/build"/>
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Help | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="help">
+ <echo>
+ Available targets:
+ help:
+ prints out this help message
+
+ dist:
+ creates a distribution directory of the
+ ${project} ${long.name}
+
+ jar:
+ creates a jar file for the ${project} ${long.name}
+ named ${jar.filename}
+
+ javadoc:
+ creates the documentation
+
+ clean:
+ removes the compiled classes
+
+ distclean:
+ deletes the distribution directory
+
+ all:
+ dist and javadoc
+
+ deploy.webstart:
+ deploys the module as a webstart application
+
+ dist.joint:
+ builds everything into one jar file. Should only
+ be used globally (from all)
+
+ fixeol:
+ change newlines to the unix standard
+ </echo>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Dist | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="dist">
+ <ant antfile="${main.buildfile}" target="dist"/>
+ <ant antfile="${cog.dir}/modules/${module.name}/build.xml" target="bootstrap.jar"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Compile | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="compile">
+ <ant antfile="${main.buildfile}" target="compile"/>
+ </target>
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Clean | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="clean">
+ <ant antfile="${main.buildfile}" target="clean"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Distclean | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="distclean">
+ <ant antfile="${main.buildfile}" target="distclean"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Jar | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="jar">
+ <ant antfile="${main.buildfile}" target="jar"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Javadoc | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="javadoc">
+ <ant antfile="${main.buildfile}" target="javadoc"/>
+ </target>
+
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | PMD | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="pmd">
+ <ant antfile="${main.buildfile}" target="pmd"/>
+ </target>
+
+ <target name="deploy.webstart">
+ <ant antfile="${main.buildfile}" target="deploy.webstart"/>
+ </target>
+
+ <target name="replacelibs">
+ <ant antfile="${main.buildfile}" target="replacelibs"/>
+ </target>
+
+ <target name="webstart.launchers">
+ <ant antfile="${main.buildfile}" target="webstart.launchers"/>
+ </target>
+
+ <target name="dist.joint">
+ <ant antfile="${main.buildfile}" target="dist.all"/>
+ </target>
+
+ <target name="module.package">
+ <ant antfile="${main.buildfile}" target="module.package"/>
+ </target>
+
+ <!-- ================================================ -->
+ <!-- fixeol -->
+ <!-- ================================================ -->
+
+ <target name="fixeol">
+ <ant antfile="${main.buildfile}" target="fixeol"/>
+ </target>
+
+ <target name="bootstrap.jar" depends="package.list">
+ <echo message="[${module.name}]: BOOTSTRAP JAR"/>
+ <mkdir dir="${build.dir}/etc.tmp"/>
+ <concat destfile="${build.dir}/etc.tmp/MANIFEST.MF">
+ <filelist dir="etc" files="MANIFEST.MF.head,MANIFEST.MF.tail"/>
+ </concat>
+ <jar jarfile="${dist.dir}/lib/coaster-bootstrap.jar" manifest="${build.dir}/etc.tmp/MANIFEST.MF" index="true">
+ <fileset dir="${build.dir}" includes="**/*.*" excludes="*.tmp, *.tmp/**, *.tmp/**/*.*"/>
+ </jar>
+ </target>
+
+ <target name="package.list">
+ <echo message="[${module.name}]: PACKAGE LIST"/>
+ <java
+ classpath="${build.dir}"
+ classname="org.globus.cog.abstraction.impl.execution.coaster.PackageList"
+ fork="true" output="${build.dir}/coaster-bootstrap.list" logError="true" failonerror="true">
+
+ <arg value="${dist.dir}/lib"/>
+ </java>
+ <copy file="${build.dir}/coaster-bootstrap.list" tofile="${dist.dir}/lib/coaster-bootstrap.list"/>
+ <copy file="${build.dir}/coaster-bootstrap.list" tofile="${cog.dir}/modules/${module.name}/lib/coaster-bootstrap.list"/>
+ <copy file="${build.dir}/coaster-bootstrap.list" tofile="${cog.dir}/modules/${module.name}/resources/coaster-bootstrap.list"/>
+ </target>
+
+</project>
+
+
Added: trunk/current/src/cog/modules/provider-coaster/dependencies.xml
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/dependencies.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/dependencies.xml 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,14 @@
+<project name="Project dependencies" default="deps" basedir=".">
+ <!-- project dependencies -->
+ <target name="deps">
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="abstraction-common"/>
+ </ant>
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="jglobus"/>
+ </ant>
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="karajan"/>
+ </ant>
+ </target>
+</project>
Added: trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.head
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.head (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.head 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1 @@
+Manifest-Version: 1.0
Added: trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.tail
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.tail (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.tail 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1 @@
+Main-Class: org.globus.cog.abstraction.impl.execution.coaster.Bootstrap
Added: trunk/current/src/cog/modules/provider-coaster/etc/log4j.properties.module
===================================================================
Added: trunk/current/src/cog/modules/provider-coaster/launchers.xml
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/launchers.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/launchers.xml 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,6 @@
+<project name="Launchers" default="create" basedir=".">
+ <target name="create">
+ </target>
+ <target name="webstart">
+ </target>
+</project>
Added: trunk/current/src/cog/modules/provider-coaster/lib/index.html
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/lib/index.html (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/lib/index.html 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,10 @@
+<html>
+ <head>
+ <title>Coaster Bootstrap Service</title>
+ </head>
+ <body>
+ <p>
+ Go away!
+ </p>
+ </body>
+</html>
Added: trunk/current/src/cog/modules/provider-coaster/libexec/bootstrap.sh
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/libexec/bootstrap.sh (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/libexec/bootstrap.sh 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,45 @@
+BS=$0
+LS=$1
+EMD5=$2
+LMD5=$3
+ID=$4
+H=$5
+L=$6
+error() {
+ echo $1
+ echo $1 >>$L
+ rm -f $DJ
+ exit 1
+}
+if [ "$L" == "" ]; then
+ L=~/coaster-boot-$ID.log
+fi
+DJ=`mktemp bootstrap.XXXXXX`
+echo "BS: $BS" >>$L
+wget -c -q $BS/coaster-bootstrap.jar -O $DJ >>$L 2>&1
+if [ "$?" != "0" ]; then
+ error "Failed to download bootstrap jar from $BS"
+fi
+AMD5=`/usr/bin/md5sum $DJ`
+echo "Expected checksum: $EMD5" >>$L
+echo "Computed checksum: ${AMD5:0:32}" >>$L
+if [ "${AMD5:0:32}" != "$EMD5" ]; then
+ error "Bootstrap jar checksum failed: $EMD5 != ${AMD5:0:32}"
+fi
+
+if [ "$JAVA_HOME" != "" ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=`which java`
+fi
+echo "JAVA=$JAVA" >>$L
+if [ -x $JAVA ]; then
+ echo "$JAVA -Djava.home="$JAVA_HOME" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID" >>$L
+ $JAVA -Djava.home="$JAVA_HOME" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID >>$L 2>&1
+ EC=$?
+ echo "Exit code: $EC" >>$L
+ rm -f $DJ
+ exit $EC
+else
+ error "Could not find a valid java executable"
+fi
Added: trunk/current/src/cog/modules/provider-coaster/meta/description.txt
===================================================================
Added: trunk/current/src/cog/modules/provider-coaster/project.properties
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/project.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/project.properties 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,7 @@
+module.name = provider-coaster
+long.name = Coasters
+version = 0.1
+project = Java CoG Kit
+lib.deps = -
+debug = true
+
Added: trunk/current/src/cog/modules/provider-coaster/resources/coaster-bootstrap.list
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/coaster-bootstrap.list (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/coaster-bootstrap.list 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,22 @@
+backport-util-concurrent.jar f9c59530e5d6ca38f3ba6c0b6213e016
+cog-abstraction-common-2.2.jar dcc9dc2c0d73f4d9e0c1e62877e4c26a
+cog-jglobus-dev-080222.jar d87a8fb09be6d8011f6492feabce475d
+cog-karajan-0.36-dev.jar 7a2e83affcdc1ecb9bc771d71e4a3181
+cog-provider-coaster-0.1.jar 79a39f5d73aa2d09c59b1106f8eea4cf
+cog-provider-gt2-2.3.jar cb42a53716cfcbdba71321f58ee80337
+cog-provider-gt4_0_0-2.4.jar 48d9507bae0f958d0a8bdfdefbce7582
+cog-provider-local-2.1.jar 9294397d24948bd5dda8759e52e4c22e
+cog-provider-localscheduler-0.2.jar 3a2e8d438d3338cad0afa3e77ebec2cd
+cog-provider-ssh-2.3.jar e798e3e843f45217f555fe6ef17a4345
+cog-util-0.92.jar ad5946aa250148262f1d15474cc34ecc
+commons-logging-1.1.jar 6b62417e77b000a87de66ee3935edbf5
+cryptix-asn1.jar 87c4cf848c81d102bd29e33681b80e8a
+cryptix.jar c3dad86be114c7aaf2ddf32c8e52184a
+cryptix32.jar 59772ad239684bf10ae8fe71f4dbae22
+j2ssh-common-0.2.2.jar d65a51ea6f64efc066915c1618c613ca
+j2ssh-core-0.2.2-patched.jar 9bf1ffb8ab700234649f70ef4a35f029
+jaxrpc.jar 8e7d80b5d77dff6ed2f41352e9147101
+jce-jdk13-131.jar 06fc7049669d16c4001a452e100b401f
+jgss.jar 9cccfd21259791b509af229a0181f207
+log4j-1.2.8.jar 18a4ca847248e5b8606325684342701c
+puretls.jar 90b9c31c201243b9f4a24fa11d404702
Added: trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,6 @@
+provider=coaster
+sandbox=false
+executionTaskHandler=org.globus.cog.abstraction.impl.execution.local.TaskHandlerImpl
+securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
+
+
Added: trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,19 @@
+# Set root category priority to WARN and its only appender to CONSOLE.
+log4j.rootCategory=WARN, CONSOLE
+
+# A1 is set to be a ConsoleAppender.
+#log4j.appender.DEBUG=org.apache.log4j.ConsoleAppender
+
+# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.CONSOLE.Threshold=WARN
+log4j.appender.CONSOLE.layout.ConversionPattern=[%C{1}] %-5p %t %x - %m%n
+#log4j.appender.CONSOLE.layout.ConversionPattern=%-5p [%C{1}] %x - %m%n
+
+
+log4j.logger.org.globus.cog.abstraction=WARN
+log4j.logger.org.apache.axis.utils.JavaUtils=ERROR
+log4j.logger.org.globus.cog.abstraction.impl.execution.coaster.JobSubmissionTaskHandler=INFO
+log4j.logger.org.globus.cog.karajan.workflow.service=DEBUG
+log4j.logger.org.globus.cog.abstraction.coaster=INFO
\ No newline at end of file
Added: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,412 @@
+#!/usr/bin/perl
+use IO::Socket;
+use strict;
+use warnings;
+
+
+my $REPLY_FLAG = 0x00000001;
+my $FINAL_FLAG = 0x00000002;
+my $ERROR_FLAG = 0x00000004;
+
+my $COMPLETED = 7;
+my $FAILED = 5;
+my $ACTIVE = 2;
+
+my $TAG = 0;
+my $RETRIES = 3;
+my $REPLYTIMEOUT = 60;
+my $MAXFRAGS = 16;
+
+my $IDLETIMEOUT = 300; #Seconds
+my $LASTRECV = 0;
+
+my $BUFSZ = 2048;
+
+my %REQUESTS = ();
+my %REPLIES = ();
+
+my $LOG = "$ENV{HOME}/worker$ARGV[0].log";
+
+
+my %HANDLERS = (
+ "SHUTDOWN" => \&shutdown,
+ "SUBMITJOB" => \&submitjob,
+ "REGISTER" => \®ister,
+);
+
+my @CMDQ = ();
+
+my $ID=$ARGV[0];
+my $URI=$ARGV[1];
+my $SCHEME;
+my $HOSTNAME;
+my $PORT;
+if ($URI =~ /(.*):\/\//) { $SCHEME = $1; } else { die "Could not parse url scheme: $URI"; }
+if ($URI =~ /.*:\/\/(.*):/) { $HOSTNAME = $1; } else { die "Could not parse url hostname: $URI"; }
+if ($URI =~ /.*:\/\/.*:(.*)/) { $PORT = $1; } else { die "Could not parse url port: $URI"; }
+my $SOCK;
+
+my $JOBID;
+my %JOB;
+my %JOBENV;
+my @JOBARGS;
+
+sub wlog {
+ my $msg;
+ foreach $msg (@_) {
+ print LOG time(), " ", $msg;
+ #print $msg;
+ }
+}
+
+sub init() {
+ my $fail = 0;
+
+ open(LOG, ">$LOG") or die "Failed to open log file: $!";
+ my $b = select(LOG);
+ $| = 1;
+ select($b);
+ print LOG time(), " Logging started\n";
+
+ wlog "uri=$URI, scheme=$SCHEME, host=$HOSTNAME, port=$PORT, id=$ID\n";
+ for ($_ = 0; $_ < 10; $_++) {
+ $SOCK = IO::Socket::INET->new(Proto=>'tcp', PeerAddr=>$HOSTNAME, PeerPort=>$PORT) || ($fail = 1);
+ if (!$fail) {
+ $SOCK->setsockopt(SOL_SOCKET, SO_RCVBUF, 16384);
+ $SOCK->setsockopt(SOL_SOCKET, SO_SNDBUF, 32768);
+ last;
+ }
+ }
+ if ($fail) {
+ die "Failed to create sockets: $!";
+ }
+}
+
+
+sub sendm {
+ my ($tag, $flags, $msg) = @_;
+ my $len = length($msg);
+ my $buf = pack("VVV", $tag, $flags, $len);
+ $buf = $buf.$msg;
+ wlog("> len=$len, tag=$tag, flags=$flags, $msg\n");
+ $SOCK->send($buf) == length($buf) or die "cannot send to $SOCK: $!";
+}
+
+sub sendFrags {
+ my ($tag, $flg, @msgs) = @_;
+
+ for (my $i = 0; $i <= $#msgs; $i++) {
+ sendm($tag, ($i < $#msgs) ? $flg : ($FINAL_FLAG | $flg), $msgs[$i]);
+ }
+}
+
+sub sendCmd {
+ my @cmd = @_;
+ my $cont = shift(@cmd);
+ my $ctag = $TAG++;
+
+ registerCmd($ctag, $cont);
+ sendFrags($ctag, 0, @cmd);
+}
+
+sub sendReply {
+ my ($tag, @msgs) = @_;
+
+ sendFrags($tag, $REPLY_FLAG, @msgs);
+}
+
+sub sendError {
+ my ($tag, @msgs) = @_;
+
+ sendFrags($tag, $REPLY_FLAG | $ERROR_FLAG, @msgs);
+}
+
+sub unpackData {
+ my ($data) = @_;
+
+ my $lendata = length($data);
+ if ($lendata < 12) {
+ die "Received faulty message (length < 12: $lendata)";
+ }
+ my $tag = unpack("V", substr($data, 0, 4));
+ my $flg = unpack("V", substr($data, 4, 4));
+ my $len = unpack("V", substr($data, 8, 4));
+ my $msg;
+ $SOCK->recv($msg, $len);
+
+ wlog("< len=$len, tag=$tag, flags=$flg, $data\n");
+ return ($tag, $flg, $msg);
+}
+
+sub processRequest {
+ my ($tag, $timeout, $err, $request) = @_;
+
+ if ($timeout) {
+ sendError($tag, ("Timed out waiting for all fragments"));
+ }
+ else {
+ wlog "Processing request\n";
+ my $cmd = shift(@$request);
+ wlog "Cmd is $cmd\n";
+ if (exists($HANDLERS{$cmd})) {
+ $HANDLERS{$cmd}->($tag, 0, $request);
+ }
+ else {
+ sendError($tag, ("Unknown command: $cmd"));
+ }
+ }
+}
+
+sub process {
+ my ($tag, $flg, $msg) = @_;
+
+
+ my $reply = $flg & $REPLY_FLAG;
+ my ($record, $cont, $start, $frags);
+
+ if ($reply) {
+ if (exists($REPLIES{$tag})) {
+ $record = $REPLIES{$tag};
+ ($cont, $start, $frags) = ($record->[0], $record->[1], $record->[2]);
+ }
+ else {
+ wlog("Warning: received reply to unregistered command (tag=$tag). Discarding.\n");
+ return;
+ }
+ }
+ else {
+ if (!exists($REQUESTS{$tag})) {
+ $REQUESTS{$tag} = [\&processRequest, time(), []];
+ wlog "New request ($tag)\n";
+ }
+ $record = $REQUESTS{$tag};
+ ($cont, $start, $frags) = ($$record[0], $$record[1], $$record[2]);
+ }
+
+ my $fin = $flg & $FINAL_FLAG;
+ my $err = $flg & $ERROR_FLAG;
+
+ push @$frags, $msg;
+
+ if ($fin) {
+ if ($reply) {
+ delete($REPLIES{$tag});
+ }
+ else {
+ delete($REQUESTS{$tag});
+ }
+ $cont->($tag, 0, $err, $frags);
+ }
+}
+
+sub checkTimeouts2 {
+ my ($hash) = @_;
+
+ my $now = time();
+ my @del = ();
+
+ my $k;
+ my $v;
+
+ while (($k, $v) = each(%$hash)) {
+ if ($now - $$v[1] > $REPLYTIMEOUT) {
+ push(@del, $k);
+ my $cont = $$v[0];
+ $cont->($k, 1, 0, ());
+ }
+ }
+
+ foreach $k (@del) {
+ delete $$hash{$k};
+ }
+}
+
+sub checkTimeouts {
+ checkTimeouts2(\%REQUESTS);
+ checkTimeouts2(\%REPLIES);
+ if ($LASTRECV != 0) {
+ my $dif = time() - $LASTRECV;
+ if ($dif >= $IDLETIMEOUT) {
+ die "Idle time exceeded";
+ }
+ }
+}
+
+sub recvOne {
+ my $data;
+ $SOCK->recv($data, 12);
+ if (length($data) > 0) {
+ wlog "Received $data\n";
+ eval { process(unpackData($data)); } || wlog "$@\n";
+ $LASTRECV = time();
+ }
+ else {
+ #sleep 250ms
+ select(undef, undef, undef, 0.25);
+ checkTimeouts();
+ }
+}
+
+sub registerCmd {
+ my ($tag, $cont) = @_;
+
+ $REPLIES{$tag} = [$cont, time(), ()];
+}
+
+
+sub mainloop {
+ my $cmd;
+ while(1) {
+ foreach $cmd (@CMDQ) {
+ sendCmd(@$cmd);
+ }
+ @CMDQ = ();
+ recvOne();
+ }
+}
+
+sub queueCmd {
+ push @CMDQ, [@_];
+}
+
+sub printreply {
+ my ($tag, $timeout, $err, $reply) = @_;
+ if ($timeout) {
+ wlog "Timed out waiting for reply to $tag\n";
+ }
+ else {
+ wlog "$$reply[0]\n";
+ }
+}
+
+sub nullCB {
+ my ($tag, $timeout, $err, $reply) = @_;
+}
+
+sub registerCB {
+ my ($tag, $timeout, $err, $reply) = @_;
+
+ if ($timeout) {
+ die "Failed to register (timeout)\n";
+ }
+ if ($err) {
+ die "Failed to register (service returned error: ".join("\n", @$reply).")";
+ }
+}
+
+sub register {
+ my ($tag, $timeout, $reply) = @_;
+ sendReply($tag, ("OK"));
+}
+
+
+sub shutdown {
+ my ($tag, $timeout, $msgs) = @_;
+
+ sendReply($tag, ("OK"));
+ wlog "Shutdown command received. Exiting\n";
+ exit 0;
+}
+
+sub submitjob {
+ my ($tag, $timeout, $msgs) = @_;
+ my $desc = $$msgs[0];
+ my @lines = split(/\n/, $desc);
+ my $line;
+ $JOBID = undef;
+ %JOB = ();
+ @JOBARGS = ();
+ %JOBENV = ();
+ foreach $line (@lines) {
+ $line =~ s/\\n/\n/;
+ $line =~ s/\\\\/\\/;
+ my @pair = split(/=/, $line, 2);
+ if ($pair[0] eq "arg") {
+ push @JOBARGS, $pair[1];
+ }
+ elsif ($pair[0] eq "env") {
+ my @ep = split(/=/, $pair[1], 2);
+ $JOBENV{"$ep[0]"} = $ep[1];
+ }
+ elsif ($pair[0] eq "identity") {
+ $JOBID = $pair[1];
+ }
+ else {
+ $JOB{$pair[0]} = $pair[1];
+ }
+ }
+ if (checkJob($tag)) {
+ sendCmd((\&nullCB, "JOBSTATUS", $JOBID, "$ACTIVE", "0", ""));
+ forkjob();
+ }
+}
+
+sub checkJob() {
+ my $tag = shift;
+ my $executable = $JOB{"executable"};
+ if (!(defined $JOBID)) {
+ sendReply($tag, ("Missing job identity"));
+ return 0;
+ }
+ elsif (!(defined $executable)) {
+ sendReply($tag, ("Missing executable"));
+ return 0;
+ }
+ else {
+ sendReply($tag, ("OK"));
+ return 1;
+ }
+}
+
+sub forkjob {
+ my ($pid, $status);
+ $pid = fork();
+ if (defined($pid)) {
+ if ($pid == 0) {
+ runjob();
+ }
+ else {
+ wlog "Forked process $pid. Waiting for its completion\n";
+ waitpid($pid, 0);
+ $status = $?;
+ wlog "Child process $pid terminated. Status is $status. $!\n";
+ queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", "");
+ }
+ }
+ else {
+ queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "512", "Could not fork child process");
+ }
+}
+
+sub runjob {
+ my $executable = $JOB{"executable"};
+ my $stdout = $JOB{"stdout"};
+ my $stderr = $JOB{"stderr"};
+
+ wlog "Running $executable\n";
+ my $ename;
+ foreach $ename (keys %JOBENV) {
+ $ENV{$ename} = $JOBENV{$ename};
+ }
+ unshift @JOBARGS, $executable;
+ if (defined $stdout) {
+ close STDOUT;
+ open STDOUT, $stdout;
+ }
+ if (defined $stderr) {
+ close STDERR;
+ open STDERR, $stderr;
+ }
+ exec { $executable } @JOBARGS or queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "513", "Could not execute $executable: $!");
+ die "Could not execute $executable: $!";
+}
+
+my $MSG="0";
+
+my $myhost=`hostname -i`;
+$myhost =~ s/\s+$//;
+init();
+
+queueCmd(\®isterCB, "REGISTER", $ID, "wid://$ID");
+
+mainloop();
Property changes on: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,25 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jan 19, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.abstraction.impl.execution.coaster.SubmitJobCommand;
+import org.globus.cog.karajan.workflow.service.AbstractRequestManager;
+import org.globus.cog.karajan.workflow.service.handlers.ChannelConfigurationHandler;
+import org.globus.cog.karajan.workflow.service.handlers.ShutdownHandler;
+import org.globus.cog.karajan.workflow.service.handlers.VersionHandler;
+
+public class CoasterRequestManager extends AbstractRequestManager {
+ public CoasterRequestManager() {
+ addHandler("VERSION", VersionHandler.class);
+ addHandler("CHANNELCONFIG", ChannelConfigurationHandler.class);
+ addHandler("SHUTDOWN", ShutdownHandler.class);
+ addHandler(SubmitJobCommand.NAME, SubmitJobHandler.class);
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,139 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jan 19, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.job.manager.JobQueue;
+import org.globus.cog.abstraction.coaster.service.local.JobStatusHandler;
+import org.globus.cog.abstraction.coaster.service.local.RegistrationHandler;
+import org.globus.cog.karajan.workflow.service.ConnectionHandler;
+import org.globus.cog.karajan.workflow.service.GSSService;
+import org.globus.cog.karajan.workflow.service.RequestManager;
+import org.globus.cog.karajan.workflow.service.ServiceRequestManager;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.globus.gsi.gssapi.auth.SelfAuthorization;
+
+public class CoasterService extends GSSService {
+ public static final Logger logger = Logger
+ .getLogger(CoasterService.class);
+
+ private String registrationURL, id;
+ private JobQueue jobQueue;
+ private LocalTCPService localService;
+ private Exception e;
+ private boolean done;
+
+ public CoasterService() throws IOException {
+ this(null, null);
+ }
+
+ public CoasterService(String registrationURL, String id)
+ throws IOException {
+ super();
+ this.registrationURL = registrationURL;
+ this.id = id;
+ setAuthorization(new SelfAuthorization());
+ RequestManager rm = new ServiceRequestManager();
+ rm.addHandler("REGISTER", RegistrationHandler.class);
+ rm.addHandler("JOBSTATUS", JobStatusHandler.class);
+ localService = new LocalTCPService(rm);
+ }
+
+ protected void handleConnection(Socket sock) {
+ logger.debug("Got connection");
+ try {
+ ConnectionHandler handler = new ConnectionHandler(this, sock,
+ new CoasterRequestManager());
+ handler.start();
+ }
+ catch (Exception e) {
+ logger.warn("Could not start connection handler", e);
+ }
+ }
+
+ public void start() {
+ super.start();
+ try {
+ localService.start();
+ jobQueue = new JobQueue(localService);
+ jobQueue.start();
+ localService.setWorkerManager(jobQueue.getWorkerManager());
+ logger.info("Started local service: " + localService.getContact());
+ if (id != null) {
+ try {
+ logger.info("Reserving channel for registration");
+ KarajanChannel channel = ChannelManager.getManager()
+ .reserveChannel(registrationURL, null);
+ logger.info("Sending registration");
+ RegistrationCommand reg = new RegistrationCommand(id,
+ "https://" + getHost() + ":" + getPort());
+ reg.execute(channel);
+ logger.info("Registration complete");
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to register service",
+ e);
+ }
+ }
+ logger.info("Started coaster service: " + this);
+ }
+ catch (Exception e) {
+ logger.error("Failed to start coaster service", e);
+ stop(e);
+ }
+ }
+
+ private void stop(Exception e) {
+ synchronized(this) {
+ this.e = e;
+ done = true;
+ notifyAll();
+ }
+ }
+
+ public void waitFor() throws Exception {
+ synchronized(this) {
+ while (!done) {
+ wait();
+ }
+ if (e != null) {
+ throw e;
+ }
+ }
+ }
+
+ public JobQueue getJobQueue() {
+ return jobQueue;
+ }
+
+ public static void main(String[] args) {
+ try {
+ CoasterService s;
+ if (args.length < 2) {
+ s = new CoasterService();
+ }
+ else {
+ s = new CoasterService(args[0], args[1]);
+ }
+ s.start();
+ //JobSubmissionTaskHandler.main(new String[0]);
+ s.waitFor();
+ System.exit(0);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,66 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 12, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import org.globus.cog.abstraction.impl.common.execution.JobException;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+public class JobStatusCommand extends Command {
+ public static final String NAME = "JOBSTATUS";
+
+ private String taskId;
+ private Status status;
+
+ public JobStatusCommand(String taskId, Status status) {
+ super(NAME);
+ this.taskId = taskId;
+ this.status = status;
+ }
+
+
+ public void send() throws ProtocolException {
+ try {
+ serialize();
+ }
+ catch (Exception e) {
+ throw new ProtocolException(
+ "Could not serialize status", e);
+ }
+ super.send();
+ }
+
+ protected void serialize() throws IOException {
+ addOutData(taskId);
+ addOutData(String.valueOf(status.getStatusCode()));
+ if (status.getException() instanceof JobException) {
+ addOutData(String.valueOf(((JobException) status.getException()).getExitCode()));
+ }
+ else {
+ addOutData("0");
+ }
+ StringBuffer sb = new StringBuffer();
+ if (status.getMessage() != null) {
+ sb.append(status.getMessage());
+ }
+ if (status.getException() != null) {
+ StringWriter sw = new StringWriter();
+ status.getException().printStackTrace(new PrintWriter(sw));
+ sb.append('\n');
+ sb.append(sw.toString());
+ }
+ addOutData(sb.toString());
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,67 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 15, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.job.manager.WorkerManager;
+import org.globus.cog.karajan.workflow.service.GSSService;
+import org.globus.cog.karajan.workflow.service.RequestManager;
+import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelException;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+
+public class LocalTCPService extends GSSService implements Registering {
+ public static final Logger logger = Logger.getLogger(LocalTCPService.class);
+
+ private WorkerManager workerManager;
+
+ public LocalTCPService(RequestManager rm) throws IOException {
+ super(false, 0);
+ setRequestManager(rm);
+ }
+
+ public void registrationReceived(String id, String url, KarajanChannel channel) throws ChannelException {
+ if (logger.isInfoEnabled()) {
+ logger.info("Received registration: id = " + id + ", url = " + url);
+ }
+ ChannelContext cc = channel.getChannelContext();
+ cc.getChannelID().setLocalID("coaster");
+ cc.getChannelID().setRemoteID(id);
+ ChannelManager.getManager().registerChannel(cc.getChannelID(), channel);
+ workerManager.registrationReceived(id, url, channel.getChannelContext());
+ }
+
+ public WorkerManager getWorkerManager() {
+ return workerManager;
+ }
+
+ public void setWorkerManager(WorkerManager workerManager) {
+ this.workerManager = workerManager;
+ }
+
+ protected void handleConnection(Socket socket) {
+ try {
+ socket.setReceiveBufferSize(2048);
+ socket.setSendBufferSize(4096);
+ socket.setTcpNoDelay(true);
+ }
+ catch (SocketException e) {
+ e.printStackTrace();
+ }
+ super.handleConnection(socket);
+ }
+
+
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/Registering.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/Registering.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/Registering.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,17 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 15, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.karajan.workflow.service.channels.ChannelException;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+
+public interface Registering {
+ void registrationReceived(String id, String url, KarajanChannel channel) throws ChannelException;
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/RegistrationCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/RegistrationCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/RegistrationCommand.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,22 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.abstraction.coaster.service.local.RegistrationHandler;
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+
+public class RegistrationCommand extends Command {
+ public RegistrationCommand(String id, String url) {
+ super(RegistrationHandler.NAME);
+ addOutData(id);
+ addOutData(url);
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,186 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.globus.cog.abstraction.coaster.service.job.manager.TaskNotifier;
+import org.globus.cog.abstraction.impl.common.IdentityImpl;
+import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
+import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
+import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
+import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.abstraction.interfaces.ExecutionService;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+
+public class SubmitJobHandler extends RequestHandler {
+ public void requestComplete() throws ProtocolException {
+ Task t;
+ try {
+ t = read(getInData(0));
+ ChannelContext channelContext = getChannel().getChannelContext();
+ new TaskNotifier(t, channelContext);
+ ((CoasterService) channelContext.getService()).getJobQueue()
+ .enqueue(t);
+ // make sure we'll have something to send notifications to
+ ChannelManager.getManager().reserveLongTerm(getChannel());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw new ProtocolException(
+ "Could not deserialize job description", e);
+ }
+ sendReply(t.getIdentity().toString());
+ }
+
+ private Task read(byte[] buf) throws IOException, ProtocolException,
+ IllegalSpecException {
+ Helper h = new Helper(buf);
+
+ Task t = new TaskImpl();
+ t.setType(Task.JOB_SUBMISSION);
+ JobSpecification spec = new JobSpecificationImpl();
+ t.setSpecification(spec);
+
+ String clientId = h.read("identity");
+ t.setIdentity(new IdentityImpl(clientId + "-"
+ + new IdentityImpl().getValue()));
+ spec.setExecutable(h.read("executable").intern());
+ spec.setDirectory(h.read("directory"));
+ spec.setStdInput(h.read("stdin"));
+ spec.setStdOutput(h.read("stdout"));
+ spec.setStdError(h.read("stderr"));
+ String s;
+ while ((s = h.read("arg")) != null) {
+ spec.addArgument(s);
+ }
+
+ while ((s = h.read("env")) != null) {
+ spec.addEnvironmentVariable(getKey(s), getValue(s));
+ }
+
+ while ((s = h.read("attr")) != null) {
+ spec.setAttribute(getKey(s), getValue(s));
+ }
+
+ ExecutionService service = new ExecutionServiceImpl();
+
+ setServiceParams(service, h.read("contact"), h.read("provider"), h
+ .read("jm").intern());
+ t.setService(0, service);
+
+ return t;
+ }
+
+ protected void setServiceParams(ExecutionService s, String contact,
+ String provider, String jm) throws IllegalSpecException {
+ if (jm == null) {
+ jm = "fork";
+ }
+
+ if (jm.equalsIgnoreCase("fork")) {
+ s.setProvider("local");
+ }
+ else {
+ s.setProvider("coaster");
+ s.setJobManager(jm);
+ }
+ s.setServiceContact(new ServiceContactImpl(contact));
+ }
+
+ private String getKey(String s) throws ProtocolException {
+ int i = s.indexOf('=');
+ if (i == -1) {
+ throw new ProtocolException("Invalid value: " + s);
+ }
+ return s.substring(0, i);
+ }
+
+ private String getValue(String s) {
+ int i = s.indexOf('=');
+ return s.substring(i + 1);
+ }
+
+ private static class Helper {
+ private InputStream is;
+ private String key, value;
+
+ public Helper(byte[] buf) {
+ is = new ByteArrayInputStream(buf);
+ }
+
+ public String read(String key) throws IOException, ProtocolException {
+ if (this.key == null) {
+ scan();
+ }
+ if (key.equals(this.key)) {
+ this.key = null;
+ return this.value;
+ }
+ else {
+ return null;
+ }
+ }
+
+ private void scan() throws IOException, ProtocolException {
+ key = null;
+ value = null;
+ StringBuffer sb = new StringBuffer();
+ int c = is.read();
+ boolean nl = false;
+ while (value == null) {
+ switch (c) {
+ case '=': {
+ if (key == null) {
+ key = sb.toString();
+ sb = new StringBuffer();
+ }
+ else {
+ sb.append((char) c);
+ }
+ break;
+ }
+ case '\\': {
+ c = is.read();
+ if (c == 'n') {
+ sb.append('\n');
+ }
+ else if (c == '\\') {
+ sb.append('\\');
+ }
+ break;
+ }
+ case -1:
+ case '\n': {
+ if (key == null) {
+ throw new ProtocolException("Invalid line: "
+ + sb.toString());
+ }
+ else {
+ value = sb.toString();
+ }
+ return;
+ }
+ default:
+ sb.append((char) c);
+ }
+ c = is.read();
+ }
+ }
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/VersionCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/VersionCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/VersionCommand.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,24 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.abstraction.coaster.service.local.VersionHandler;
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+
+public class VersionCommand extends Command {
+ public VersionCommand() {
+ super(VersionHandler.NAME);
+ }
+
+ public String getServerVersion() {
+ return new String(getInData());
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,33 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 13, 2008
+ */
+package org.globus.cog.abstraction.coaster.service.job.manager;
+
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class AssociatedTask {
+ public final Task task;
+ public final WallTime maxWallTime;
+
+ public AssociatedTask(Task task) {
+ this.task = task;
+ this.maxWallTime = getMaxWallTime(task);
+ }
+
+ private WallTime getMaxWallTime(Task t) {
+ Object wt = ((JobSpecification) t.getSpecification()).getAttribute("maxwalltime");
+ if (wt == null) {
+ return new WallTime("10");
+ }
+ else {
+ return new WallTime(wt.toString());
+ }
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,67 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 13, 2008
+ */
+package org.globus.cog.abstraction.coaster.service.job.manager;
+
+import java.io.IOException;
+
+import org.globus.cog.abstraction.impl.common.StatusImpl;
+import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.TaskHandler;
+
+public class CoasterQueueProcessor extends QueueProcessor {
+ private TaskHandler taskHandler;
+ private WorkerManager workerManager;
+ private String workdir;
+
+ public CoasterQueueProcessor(WorkerManager workerManager)
+ throws IOException {
+ super("Coaster Queue Processor");
+ this.workerManager = workerManager;
+ this.taskHandler = new CoasterTaskHandler(workerManager);
+ }
+
+ public void run() {
+ try {
+ workerManager.start();
+ AssociatedTask at;
+ while (!this.getShutdownFlag()) {
+ at = next();
+ Worker wr = workerManager.request(at.maxWallTime, at.task);
+ if (wr != null) {
+ remove();
+ if (wr.getStatus() != null) {
+ at.task.setStatus(wr.getStatus());
+ }
+ else {
+ try {
+ at.task.getService(0).setServiceContact(
+ new ServiceContactImpl(wr.getId()));
+ new WorkerTaskMonitor(at.task, workerManager, wr);
+ taskHandler.submit(at.task);
+ }
+ catch (Exception e) {
+ at.task.setStatus(new StatusImpl(Status.FAILED,
+ null, e));
+ }
+ }
+ }
+ if (hasWrapped()) {
+ synchronized(workerManager) {
+ workerManager.wa...
[truncated message content] |