You can subscribe to this list here.
| 2006 |
Jan
|
Feb
|
Mar
|
Apr
(39) |
May
(165) |
Jun
(164) |
Jul
(127) |
Aug
(81) |
Sep
(146) |
Oct
(375) |
Nov
(241) |
Dec
(77) |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2007 |
Jan
(42) |
Feb
(38) |
Mar
(30) |
Apr
(6) |
May
(17) |
Jun
|
Jul
(15) |
Aug
(59) |
Sep
(31) |
Oct
(44) |
Nov
(30) |
Dec
(12) |
| 2008 |
Jan
(9) |
Feb
(63) |
Mar
(18) |
Apr
(43) |
May
(28) |
Jun
(32) |
Jul
(61) |
Aug
(5) |
Sep
(72) |
Oct
(48) |
Nov
(6) |
Dec
|
Revision: 1974
http://cogkit.svn.sourceforge.net/cogkit/?rev=1974&view=rev
Author: hategan
Date: 2008-04-22 15:12:02 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
autostart enabled by default
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-22 20:52:34 UTC (rev 1973)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-22 22:12:02 UTC (rev 1974)
@@ -54,7 +54,7 @@
private boolean autostart;
public JobSubmissionTaskHandler() {
- this.autostart = false;
+ this.autostart = true;
bootHandlers = new HashMap();
}
@@ -214,18 +214,18 @@
js.addArgument("0");
t.setSpecification(js);
ExecutionService s = new ExecutionServiceImpl();
- // s.setServiceContact(new ServiceContactImpl("localhost"));
- s.setServiceContact(new ServiceContactImpl("tp-grid1.ci.uchicago.edu"));
+ s.setServiceContact(new ServiceContactImpl("localhost"));
+ //s.setServiceContact(new ServiceContactImpl("tp-grid1.ci.uchicago.edu"));
// s.setServiceContact(new ServiceContactImpl("localhost:50013"));
s.setProvider("coaster");
- //s.setJobManager("local:local");
- s.setJobManager("gt2:pbs");
+ s.setJobManager("local:local");
+ //s.setJobManager("gt2:pbs");
s.setSecurityContext(new SecurityContextImpl());
t.setService(0, s);
// JobSubmissionTaskHandler th = new JobSubmissionTaskHandler(
// AbstractionFactory.newExecutionTaskHandler("local"));
JobSubmissionTaskHandler th = new JobSubmissionTaskHandler();
- th.setAutostart(true);
+ //th.setAutostart(true);
th.submit(t);
return t;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-22 20:13:46
|
Revision: 1972
http://cogkit.svn.sourceforge.net/cogkit/?rev=1972&view=rev
Author: hategan
Date: 2008-04-22 13:13:20 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
added some logging
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java 2008-04-22 18:13:19 UTC (rev 1971)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java 2008-04-22 20:13:20 UTC (rev 1972)
@@ -34,6 +34,9 @@
}
public void start() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting local service");
+ }
setAuthorization(new SelfAuthorization());
services = new HashMap();
this.accept = true;
@@ -63,6 +66,9 @@
public String waitForRegistration(Task t, String id, long timeout)
throws InterruptedException, IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for registration from service " + id);
+ }
long start = System.currentTimeMillis();
synchronized (services) {
while (!services.containsKey(id)) {
@@ -83,6 +89,9 @@
}
public void registrationReceived(String id, String url, KarajanChannel channel) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received registration from service " + id + ": " + url);
+ }
synchronized (services) {
if (services.containsKey(id)) {
throw new IllegalArgumentException(
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-04-22 18:13:19 UTC (rev 1971)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-04-22 20:13:20 UTC (rev 1972)
@@ -287,5 +287,4 @@
System.err.println(message);
System.exit(1);
}
-
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-22 18:13:19 UTC (rev 1971)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-22 20:13:20 UTC (rev 1972)
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.log4j.Logger;
import org.globus.cog.abstraction.coaster.service.local.LocalService;
import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
@@ -34,6 +35,9 @@
import org.globus.cog.abstraction.interfaces.TaskHandler;
public class ServiceManager {
+ public static final Logger logger = Logger
+ .getLogger(ServiceManager.class);
+
public static final String BOOTSTRAP_SCRIPT = "bootstrap.sh";
public static final String BOOTSTRAP_JAR = "coaster-bootstrap.jar";
public static final String BOOTSTRAP_LIST = "coaster-bootstrap.list";
@@ -65,6 +69,9 @@
public String reserveService(Task task, TaskHandler bootHandler)
throws TaskSubmissionException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reserving service for " + task);
+ }
try {
Object service = getService(task);
// beah. it's impossible to nicely abstract both concurrency
@@ -100,6 +107,10 @@
try {
startLocalService();
Task t = buildTask(task);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting coaster service on "
+ + task.getService(0) + ". Task is " + t);
+ }
bootHandler.submit(t);
String url = localService.waitForRegistration(t, (String) t
.getAttribute(TASK_ATTR_ID));
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-22 18:13:24
|
Revision: 1971
http://cogkit.svn.sourceforge.net/cogkit/?rev=1971&view=rev
Author: hategan
Date: 2008-04-22 11:13:19 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
fixes for port ranges and ip addresses
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/BootstrapService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/PortManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-04-22 16:03:56 UTC (rev 1970)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-04-22 18:13:19 UTC (rev 1971)
@@ -216,7 +216,7 @@
private void addProperty(List args, String name) {
String value = System.getProperty(name);
- if (value != null) {
+ if (value != null && !value.equals("")) {
args.add("-D" + name + "=" + value);
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/BootstrapService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/BootstrapService.java 2008-04-22 16:03:56 UTC (rev 1970)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/BootstrapService.java 2008-04-22 18:13:19 UTC (rev 1971)
@@ -161,8 +161,13 @@
}
else {
ServerSocket socket = channel.socket();
- return "http://" + CoGProperties.getDefault().getIPAddress()
+ if (CoGProperties.getDefault().getIPAddress() != null) {
+ return "http://" + CoGProperties.getDefault().getIPAddress()
+ ":" + socket.getLocalPort();
+ }
+ else {
+ return "http://localhost:" + socket.getLocalPort();
+ }
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/PortManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/PortManager.java 2008-04-22 16:03:56 UTC (rev 1970)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/PortManager.java 2008-04-22 18:13:19 UTC (rev 1971)
@@ -18,26 +18,26 @@
public class PortManager {
private static PortManager portManager;
-
+
public synchronized static PortManager getDefault() {
if (portManager == null) {
portManager = new PortManager();
}
return portManager;
}
-
+
private PortRange portRange;
-
+
protected PortManager() {
portRange = PortRange.getTcpInstance();
}
-
+
public ServerSocketChannel openServerSocketChannel() throws IOException {
ServerSocketChannel s = ServerSocketChannel.open();
bind(s.socket());
return s;
}
-
+
public void close(ServerSocketChannel s) throws IOException {
s.close();
portRange.free(s.socket().getLocalPort());
@@ -45,16 +45,23 @@
private void bind(ServerSocket socket) throws IOException {
int crt = 0;
- while(true) {
- crt = portRange.getFreePort(crt);
- try {
- socket.bind(new InetSocketAddress(crt));
- portRange.setUsed(crt);
- return;
+ if (portRange.isEnabled()) {
+ while (true) {
+ crt = portRange.getFreePort(crt);
+
+ try {
+ socket.bind(new InetSocketAddress(crt));
+
+ portRange.setUsed(crt);
+ return;
+ }
+ catch (IOException e) {
+ crt++;
+ }
}
- catch (IOException e) {
- crt++;
- }
}
+ else {
+ socket.bind(null);
+ }
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-22 16:04:06
|
Revision: 1970
http://cogkit.svn.sourceforge.net/cogkit/?rev=1970&view=rev
Author: hategan
Date: 2008-04-22 09:03:56 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
deleted unused libexec dir
Removed Paths:
-------------
trunk/current/src/cog/modules/provider-coaster/libexec/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-22 16:02:54
|
Revision: 1969
http://cogkit.svn.sourceforge.net/cogkit/?rev=1969&view=rev
Author: hategan
Date: 2008-04-22 09:02:49 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
moved bootstrap script to the right place
Added Paths:
-----------
trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh
Added: trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh 2008-04-22 16:02:49 UTC (rev 1969)
@@ -0,0 +1,45 @@
+BS=$0
+LS=$1
+EMD5=$2
+LMD5=$3
+ID=$4
+H=$5
+L=$6
+error() {
+ echo $1
+ echo $1 >>$L
+ rm -f $DJ
+ exit 1
+}
+if [ "$L" == "" ]; then
+ L=~/coaster-boot-$ID.log
+fi
+DJ=`mktemp bootstrap.XXXXXX`
+echo "BS: $BS" >>$L
+wget -c -q $BS/coaster-bootstrap.jar -O $DJ >>$L 2>&1
+if [ "$?" != "0" ]; then
+ error "Failed to download bootstrap jar from $BS"
+fi
+AMD5=`/usr/bin/md5sum $DJ`
+echo "Expected checksum: $EMD5" >>$L
+echo "Computed checksum: ${AMD5:0:32}" >>$L
+if [ "${AMD5:0:32}" != "$EMD5" ]; then
+ error "Bootstrap jar checksum failed: $EMD5 != ${AMD5:0:32}"
+fi
+
+if [ "$JAVA_HOME" != "" ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=`which java`
+fi
+echo "JAVA=$JAVA" >>$L
+if [ -x $JAVA ]; then
+ echo "$JAVA -Djava.home="$JAVA_HOME" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID" >>$L
+ $JAVA -Djava.home="$JAVA_HOME" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID >>$L 2>&1
+ EC=$?
+ echo "Exit code: $EC" >>$L
+ rm -f $DJ
+ exit $EC
+else
+ error "Could not find a valid java executable"
+fi
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-22 14:09:46
|
Revision: 1968
http://cogkit.svn.sourceforge.net/cogkit/?rev=1968&view=rev
Author: hategan
Date: 2008-04-22 07:09:31 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
upated provider props
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties
Modified: trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties 2008-04-22 14:05:35 UTC (rev 1967)
+++ trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties 2008-04-22 14:09:31 UTC (rev 1968)
@@ -1,6 +1,6 @@
provider=coaster
sandbox=false
-executionTaskHandler=org.globus.cog.abstraction.impl.execution.local.TaskHandlerImpl
+executionTaskHandler=org.globus.cog.abstraction.impl.execution.coaster.TaskHandlerImpl
securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-22 14:05:38
|
Revision: 1967
http://cogkit.svn.sourceforge.net/cogkit/?rev=1967&view=rev
Author: hategan
Date: 2008-04-22 07:05:35 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
added coaster provider
Added Paths:
-----------
trunk/current/src/cog/modules/provider-coaster/
trunk/current/src/cog/modules/provider-coaster/CHANGES.txt
trunk/current/src/cog/modules/provider-coaster/build.xml
trunk/current/src/cog/modules/provider-coaster/dependencies.xml
trunk/current/src/cog/modules/provider-coaster/etc/
trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.head
trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.tail
trunk/current/src/cog/modules/provider-coaster/etc/log4j.properties.module
trunk/current/src/cog/modules/provider-coaster/launchers.xml
trunk/current/src/cog/modules/provider-coaster/lib/
trunk/current/src/cog/modules/provider-coaster/lib/index.html
trunk/current/src/cog/modules/provider-coaster/libexec/
trunk/current/src/cog/modules/provider-coaster/libexec/bootstrap.sh
trunk/current/src/cog/modules/provider-coaster/meta/
trunk/current/src/cog/modules/provider-coaster/meta/description.txt
trunk/current/src/cog/modules/provider-coaster/project.properties
trunk/current/src/cog/modules/provider-coaster/resources/
trunk/current/src/cog/modules/provider-coaster/resources/coaster-bootstrap.list
trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties
trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
trunk/current/src/cog/modules/provider-coaster/src/
trunk/current/src/cog/modules/provider-coaster/src/org/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/Registering.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/RegistrationCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/VersionCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/IDGenerator.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/JobQueue.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/LocalQueueProcessor.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/LocalTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/QueueProcessor.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TCPTest.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/TaskNotifier.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WallTime.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerKey.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerTaskMonitor.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalRequestManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/RegistrationHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/UnregisterHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/VersionHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/BootstrapService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/CancelJobCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/CoasterChannelManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Digester.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/PackageList.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/PortManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/SubmitJobCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/TaskHandlerImpl.java
Added: trunk/current/src/cog/modules/provider-coaster/CHANGES.txt
===================================================================
Added: trunk/current/src/cog/modules/provider-coaster/build.xml
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/build.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/build.xml 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,178 @@
+<project name="Java CoG Kit" default="dist" basedir=".">
+
+ <property file="project.properties"/>
+ <property name="cog.dir" value="${basedir}/../../"/>
+ <property name="main.buildfile" value="${cog.dir}/mbuild.xml"/>
+ <property name="dist.dir" value="${cog.dir}/modules/${module.name}/dist/${module.name}-${version}"/>
+ <property name="build.dir" value="${cog.dir}/modules/${module.name}/build"/>
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Help | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="help">
+ <echo>
+ Available targets:
+ help:
+ prints out this help message
+
+ dist:
+ creates a distribution directory of the
+ ${project} ${long.name}
+
+ jar:
+ creates a jar file for the ${project} ${long.name}
+ named ${jar.filename}
+
+ javadoc:
+ creates the documentation
+
+ clean:
+ removes the compiled classes
+
+ distclean:
+ deletes the distribution directory
+
+ all:
+ dist and javadoc
+
+ deploy.webstart:
+ deploys the module as a webstart application
+
+ dist.joint:
+ builds everything into one jar file. Should only
+ be used globally (from all)
+
+ fixeol:
+ change newlines to the unix standard
+ </echo>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Dist | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="dist">
+ <ant antfile="${main.buildfile}" target="dist"/>
+ <ant antfile="${cog.dir}/modules/${module.name}/build.xml" target="bootstrap.jar"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Compile | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="compile">
+ <ant antfile="${main.buildfile}" target="compile"/>
+ </target>
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Clean | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="clean">
+ <ant antfile="${main.buildfile}" target="clean"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Distclean | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="distclean">
+ <ant antfile="${main.buildfile}" target="distclean"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Jar | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="jar">
+ <ant antfile="${main.buildfile}" target="jar"/>
+ </target>
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | Javadoc | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="javadoc">
+ <ant antfile="${main.buildfile}" target="javadoc"/>
+ </target>
+
+
+
+ <!-- _________________________________________________________________ -->
+ <!-- / \ -->
+ <!-- | PMD | -->
+ <!-- \_________________________________________________________________/ -->
+
+ <target name="pmd">
+ <ant antfile="${main.buildfile}" target="pmd"/>
+ </target>
+
+ <target name="deploy.webstart">
+ <ant antfile="${main.buildfile}" target="deploy.webstart"/>
+ </target>
+
+ <target name="replacelibs">
+ <ant antfile="${main.buildfile}" target="replacelibs"/>
+ </target>
+
+ <target name="webstart.launchers">
+ <ant antfile="${main.buildfile}" target="webstart.launchers"/>
+ </target>
+
+ <target name="dist.joint">
+ <ant antfile="${main.buildfile}" target="dist.all"/>
+ </target>
+
+ <target name="module.package">
+ <ant antfile="${main.buildfile}" target="module.package"/>
+ </target>
+
+ <!-- ================================================ -->
+ <!-- fixeol -->
+ <!-- ================================================ -->
+
+ <target name="fixeol">
+ <ant antfile="${main.buildfile}" target="fixeol"/>
+ </target>
+
+ <target name="bootstrap.jar" depends="package.list">
+ <echo message="[${module.name}]: BOOTSTRAP JAR"/>
+ <mkdir dir="${build.dir}/etc.tmp"/>
+ <concat destfile="${build.dir}/etc.tmp/MANIFEST.MF">
+ <filelist dir="etc" files="MANIFEST.MF.head,MANIFEST.MF.tail"/>
+ </concat>
+ <jar jarfile="${dist.dir}/lib/coaster-bootstrap.jar" manifest="${build.dir}/etc.tmp/MANIFEST.MF" index="true">
+ <fileset dir="${build.dir}" includes="**/*.*" excludes="*.tmp, *.tmp/**, *.tmp/**/*.*"/>
+ </jar>
+ </target>
+
+ <target name="package.list">
+ <echo message="[${module.name}]: PACKAGE LIST"/>
+ <java
+ classpath="${build.dir}"
+ classname="org.globus.cog.abstraction.impl.execution.coaster.PackageList"
+ fork="true" output="${build.dir}/coaster-bootstrap.list" logError="true" failonerror="true">
+
+ <arg value="${dist.dir}/lib"/>
+ </java>
+ <copy file="${build.dir}/coaster-bootstrap.list" tofile="${dist.dir}/lib/coaster-bootstrap.list"/>
+ <copy file="${build.dir}/coaster-bootstrap.list" tofile="${cog.dir}/modules/${module.name}/lib/coaster-bootstrap.list"/>
+ <copy file="${build.dir}/coaster-bootstrap.list" tofile="${cog.dir}/modules/${module.name}/resources/coaster-bootstrap.list"/>
+ </target>
+
+</project>
+
+
Added: trunk/current/src/cog/modules/provider-coaster/dependencies.xml
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/dependencies.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/dependencies.xml 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,14 @@
+<project name="Project dependencies" default="deps" basedir=".">
+ <!-- project dependencies -->
+ <target name="deps">
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="abstraction-common"/>
+ </ant>
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="jglobus"/>
+ </ant>
+ <ant antfile="${main.buildfile}" target="dep">
+ <property name="module" value="karajan"/>
+ </ant>
+ </target>
+</project>
Added: trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.head
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.head (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.head 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1 @@
+Manifest-Version: 1.0
Added: trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.tail
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.tail (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/etc/MANIFEST.MF.tail 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1 @@
+Main-Class: org.globus.cog.abstraction.impl.execution.coaster.Bootstrap
Added: trunk/current/src/cog/modules/provider-coaster/etc/log4j.properties.module
===================================================================
Added: trunk/current/src/cog/modules/provider-coaster/launchers.xml
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/launchers.xml (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/launchers.xml 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,6 @@
+<project name="Launchers" default="create" basedir=".">
+ <target name="create">
+ </target>
+ <target name="webstart">
+ </target>
+</project>
Added: trunk/current/src/cog/modules/provider-coaster/lib/index.html
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/lib/index.html (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/lib/index.html 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,10 @@
+<html>
+ <head>
+ <title>Coaster Bootstrap Service</title>
+ </head>
+ <body>
+ <p>
+ Go away!
+ </p>
+ </body>
+</html>
Added: trunk/current/src/cog/modules/provider-coaster/libexec/bootstrap.sh
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/libexec/bootstrap.sh (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/libexec/bootstrap.sh 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,45 @@
+BS=$0
+LS=$1
+EMD5=$2
+LMD5=$3
+ID=$4
+H=$5
+L=$6
+error() {
+ echo $1
+ echo $1 >>$L
+ rm -f $DJ
+ exit 1
+}
+if [ "$L" == "" ]; then
+ L=~/coaster-boot-$ID.log
+fi
+DJ=`mktemp bootstrap.XXXXXX`
+echo "BS: $BS" >>$L
+wget -c -q $BS/coaster-bootstrap.jar -O $DJ >>$L 2>&1
+if [ "$?" != "0" ]; then
+ error "Failed to download bootstrap jar from $BS"
+fi
+AMD5=`/usr/bin/md5sum $DJ`
+echo "Expected checksum: $EMD5" >>$L
+echo "Computed checksum: ${AMD5:0:32}" >>$L
+if [ "${AMD5:0:32}" != "$EMD5" ]; then
+ error "Bootstrap jar checksum failed: $EMD5 != ${AMD5:0:32}"
+fi
+
+if [ "$JAVA_HOME" != "" ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=`which java`
+fi
+echo "JAVA=$JAVA" >>$L
+if [ -x $JAVA ]; then
+ echo "$JAVA -Djava.home="$JAVA_HOME" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID" >>$L
+ $JAVA -Djava.home="$JAVA_HOME" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID >>$L 2>&1
+ EC=$?
+ echo "Exit code: $EC" >>$L
+ rm -f $DJ
+ exit $EC
+else
+ error "Could not find a valid java executable"
+fi
Added: trunk/current/src/cog/modules/provider-coaster/meta/description.txt
===================================================================
Added: trunk/current/src/cog/modules/provider-coaster/project.properties
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/project.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/project.properties 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,7 @@
+module.name = provider-coaster
+long.name = Coasters
+version = 0.1
+project = Java CoG Kit
+lib.deps = -
+debug = true
+
Added: trunk/current/src/cog/modules/provider-coaster/resources/coaster-bootstrap.list
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/coaster-bootstrap.list (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/coaster-bootstrap.list 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,22 @@
+backport-util-concurrent.jar f9c59530e5d6ca38f3ba6c0b6213e016
+cog-abstraction-common-2.2.jar dcc9dc2c0d73f4d9e0c1e62877e4c26a
+cog-jglobus-dev-080222.jar d87a8fb09be6d8011f6492feabce475d
+cog-karajan-0.36-dev.jar 7a2e83affcdc1ecb9bc771d71e4a3181
+cog-provider-coaster-0.1.jar 79a39f5d73aa2d09c59b1106f8eea4cf
+cog-provider-gt2-2.3.jar cb42a53716cfcbdba71321f58ee80337
+cog-provider-gt4_0_0-2.4.jar 48d9507bae0f958d0a8bdfdefbce7582
+cog-provider-local-2.1.jar 9294397d24948bd5dda8759e52e4c22e
+cog-provider-localscheduler-0.2.jar 3a2e8d438d3338cad0afa3e77ebec2cd
+cog-provider-ssh-2.3.jar e798e3e843f45217f555fe6ef17a4345
+cog-util-0.92.jar ad5946aa250148262f1d15474cc34ecc
+commons-logging-1.1.jar 6b62417e77b000a87de66ee3935edbf5
+cryptix-asn1.jar 87c4cf848c81d102bd29e33681b80e8a
+cryptix.jar c3dad86be114c7aaf2ddf32c8e52184a
+cryptix32.jar 59772ad239684bf10ae8fe71f4dbae22
+j2ssh-common-0.2.2.jar d65a51ea6f64efc066915c1618c613ca
+j2ssh-core-0.2.2-patched.jar 9bf1ffb8ab700234649f70ef4a35f029
+jaxrpc.jar 8e7d80b5d77dff6ed2f41352e9147101
+jce-jdk13-131.jar 06fc7049669d16c4001a452e100b401f
+jgss.jar 9cccfd21259791b509af229a0181f207
+log4j-1.2.8.jar 18a4ca847248e5b8606325684342701c
+puretls.jar 90b9c31c201243b9f4a24fa11d404702
Added: trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/cog-provider.properties 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,6 @@
+provider=coaster
+sandbox=false
+executionTaskHandler=org.globus.cog.abstraction.impl.execution.local.TaskHandlerImpl
+securityContext=org.globus.cog.abstraction.impl.common.task.SecurityContextImpl
+
+
Added: trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,19 @@
+# Set root category priority to WARN and its only appender to CONSOLE.
+log4j.rootCategory=WARN, CONSOLE
+
+# A1 is set to be a ConsoleAppender.
+#log4j.appender.DEBUG=org.apache.log4j.ConsoleAppender
+
+# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.CONSOLE.Threshold=WARN
+log4j.appender.CONSOLE.layout.ConversionPattern=[%C{1}] %-5p %t %x - %m%n
+#log4j.appender.CONSOLE.layout.ConversionPattern=%-5p [%C{1}] %x - %m%n
+
+
+log4j.logger.org.globus.cog.abstraction=WARN
+log4j.logger.org.apache.axis.utils.JavaUtils=ERROR
+log4j.logger.org.globus.cog.abstraction.impl.execution.coaster.JobSubmissionTaskHandler=INFO
+log4j.logger.org.globus.cog.karajan.workflow.service=DEBUG
+log4j.logger.org.globus.cog.abstraction.coaster=INFO
\ No newline at end of file
Added: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,412 @@
+#!/usr/bin/perl
+use IO::Socket;
+use strict;
+use warnings;
+
+
+my $REPLY_FLAG = 0x00000001;
+my $FINAL_FLAG = 0x00000002;
+my $ERROR_FLAG = 0x00000004;
+
+my $COMPLETED = 7;
+my $FAILED = 5;
+my $ACTIVE = 2;
+
+my $TAG = 0;
+my $RETRIES = 3;
+my $REPLYTIMEOUT = 60;
+my $MAXFRAGS = 16;
+
+my $IDLETIMEOUT = 300; #Seconds
+my $LASTRECV = 0;
+
+my $BUFSZ = 2048;
+
+my %REQUESTS = ();
+my %REPLIES = ();
+
+my $LOG = "$ENV{HOME}/worker$ARGV[0].log";
+
+
+my %HANDLERS = (
+ "SHUTDOWN" => \&shutdown,
+ "SUBMITJOB" => \&submitjob,
+ "REGISTER" => \®ister,
+);
+
+my @CMDQ = ();
+
+my $ID=$ARGV[0];
+my $URI=$ARGV[1];
+my $SCHEME;
+my $HOSTNAME;
+my $PORT;
+if ($URI =~ /(.*):\/\//) { $SCHEME = $1; } else { die "Could not parse url scheme: $URI"; }
+if ($URI =~ /.*:\/\/(.*):/) { $HOSTNAME = $1; } else { die "Could not parse url hostname: $URI"; }
+if ($URI =~ /.*:\/\/.*:(.*)/) { $PORT = $1; } else { die "Could not parse url port: $URI"; }
+my $SOCK;
+
+my $JOBID;
+my %JOB;
+my %JOBENV;
+my @JOBARGS;
+
+sub wlog {
+ my $msg;
+ foreach $msg (@_) {
+ print LOG time(), " ", $msg;
+ #print $msg;
+ }
+}
+
+sub init() {
+ my $fail = 0;
+
+ open(LOG, ">$LOG") or die "Failed to open log file: $!";
+ my $b = select(LOG);
+ $| = 1;
+ select($b);
+ print LOG time(), " Logging started\n";
+
+ wlog "uri=$URI, scheme=$SCHEME, host=$HOSTNAME, port=$PORT, id=$ID\n";
+ for ($_ = 0; $_ < 10; $_++) {
+ $SOCK = IO::Socket::INET->new(Proto=>'tcp', PeerAddr=>$HOSTNAME, PeerPort=>$PORT) || ($fail = 1);
+ if (!$fail) {
+ $SOCK->setsockopt(SOL_SOCKET, SO_RCVBUF, 16384);
+ $SOCK->setsockopt(SOL_SOCKET, SO_SNDBUF, 32768);
+ last;
+ }
+ }
+ if ($fail) {
+ die "Failed to create sockets: $!";
+ }
+}
+
+
+sub sendm {
+ my ($tag, $flags, $msg) = @_;
+ my $len = length($msg);
+ my $buf = pack("VVV", $tag, $flags, $len);
+ $buf = $buf.$msg;
+ wlog("> len=$len, tag=$tag, flags=$flags, $msg\n");
+ $SOCK->send($buf) == length($buf) or die "cannot send to $SOCK: $!";
+}
+
+sub sendFrags {
+ my ($tag, $flg, @msgs) = @_;
+
+ for (my $i = 0; $i <= $#msgs; $i++) {
+ sendm($tag, ($i < $#msgs) ? $flg : ($FINAL_FLAG | $flg), $msgs[$i]);
+ }
+}
+
+sub sendCmd {
+ my @cmd = @_;
+ my $cont = shift(@cmd);
+ my $ctag = $TAG++;
+
+ registerCmd($ctag, $cont);
+ sendFrags($ctag, 0, @cmd);
+}
+
+sub sendReply {
+ my ($tag, @msgs) = @_;
+
+ sendFrags($tag, $REPLY_FLAG, @msgs);
+}
+
+sub sendError {
+ my ($tag, @msgs) = @_;
+
+ sendFrags($tag, $REPLY_FLAG | $ERROR_FLAG, @msgs);
+}
+
+sub unpackData {
+ my ($data) = @_;
+
+ my $lendata = length($data);
+ if ($lendata < 12) {
+ die "Received faulty message (length < 12: $lendata)";
+ }
+ my $tag = unpack("V", substr($data, 0, 4));
+ my $flg = unpack("V", substr($data, 4, 4));
+ my $len = unpack("V", substr($data, 8, 4));
+ my $msg;
+ $SOCK->recv($msg, $len);
+
+ wlog("< len=$len, tag=$tag, flags=$flg, $data\n");
+ return ($tag, $flg, $msg);
+}
+
+sub processRequest {
+ my ($tag, $timeout, $err, $request) = @_;
+
+ if ($timeout) {
+ sendError($tag, ("Timed out waiting for all fragments"));
+ }
+ else {
+ wlog "Processing request\n";
+ my $cmd = shift(@$request);
+ wlog "Cmd is $cmd\n";
+ if (exists($HANDLERS{$cmd})) {
+ $HANDLERS{$cmd}->($tag, 0, $request);
+ }
+ else {
+ sendError($tag, ("Unknown command: $cmd"));
+ }
+ }
+}
+
+sub process {
+ my ($tag, $flg, $msg) = @_;
+
+
+ my $reply = $flg & $REPLY_FLAG;
+ my ($record, $cont, $start, $frags);
+
+ if ($reply) {
+ if (exists($REPLIES{$tag})) {
+ $record = $REPLIES{$tag};
+ ($cont, $start, $frags) = ($record->[0], $record->[1], $record->[2]);
+ }
+ else {
+ wlog("Warning: received reply to unregistered command (tag=$tag). Discarding.\n");
+ return;
+ }
+ }
+ else {
+ if (!exists($REQUESTS{$tag})) {
+ $REQUESTS{$tag} = [\&processRequest, time(), []];
+ wlog "New request ($tag)\n";
+ }
+ $record = $REQUESTS{$tag};
+ ($cont, $start, $frags) = ($$record[0], $$record[1], $$record[2]);
+ }
+
+ my $fin = $flg & $FINAL_FLAG;
+ my $err = $flg & $ERROR_FLAG;
+
+ push @$frags, $msg;
+
+ if ($fin) {
+ if ($reply) {
+ delete($REPLIES{$tag});
+ }
+ else {
+ delete($REQUESTS{$tag});
+ }
+ $cont->($tag, 0, $err, $frags);
+ }
+}
+
+sub checkTimeouts2 {
+ my ($hash) = @_;
+
+ my $now = time();
+ my @del = ();
+
+ my $k;
+ my $v;
+
+ while (($k, $v) = each(%$hash)) {
+ if ($now - $$v[1] > $REPLYTIMEOUT) {
+ push(@del, $k);
+ my $cont = $$v[0];
+ $cont->($k, 1, 0, ());
+ }
+ }
+
+ foreach $k (@del) {
+ delete $$hash{$k};
+ }
+}
+
+sub checkTimeouts {
+ checkTimeouts2(\%REQUESTS);
+ checkTimeouts2(\%REPLIES);
+ if ($LASTRECV != 0) {
+ my $dif = time() - $LASTRECV;
+ if ($dif >= $IDLETIMEOUT) {
+ die "Idle time exceeded";
+ }
+ }
+}
+
+sub recvOne {
+ my $data;
+ $SOCK->recv($data, 12);
+ if (length($data) > 0) {
+ wlog "Received $data\n";
+ eval { process(unpackData($data)); } || wlog "$@\n";
+ $LASTRECV = time();
+ }
+ else {
+ #sleep 250ms
+ select(undef, undef, undef, 0.25);
+ checkTimeouts();
+ }
+}
+
+sub registerCmd {
+ my ($tag, $cont) = @_;
+
+ $REPLIES{$tag} = [$cont, time(), ()];
+}
+
+
+sub mainloop {
+ my $cmd;
+ while(1) {
+ foreach $cmd (@CMDQ) {
+ sendCmd(@$cmd);
+ }
+ @CMDQ = ();
+ recvOne();
+ }
+}
+
+sub queueCmd {
+ push @CMDQ, [@_];
+}
+
+sub printreply {
+ my ($tag, $timeout, $err, $reply) = @_;
+ if ($timeout) {
+ wlog "Timed out waiting for reply to $tag\n";
+ }
+ else {
+ wlog "$$reply[0]\n";
+ }
+}
+
+sub nullCB {
+ my ($tag, $timeout, $err, $reply) = @_;
+}
+
+sub registerCB {
+ my ($tag, $timeout, $err, $reply) = @_;
+
+ if ($timeout) {
+ die "Failed to register (timeout)\n";
+ }
+ if ($err) {
+ die "Failed to register (service returned error: ".join("\n", @$reply).")";
+ }
+}
+
+sub register {
+ my ($tag, $timeout, $reply) = @_;
+ sendReply($tag, ("OK"));
+}
+
+
+sub shutdown {
+ my ($tag, $timeout, $msgs) = @_;
+
+ sendReply($tag, ("OK"));
+ wlog "Shutdown command received. Exiting\n";
+ exit 0;
+}
+
+sub submitjob {
+ my ($tag, $timeout, $msgs) = @_;
+ my $desc = $$msgs[0];
+ my @lines = split(/\n/, $desc);
+ my $line;
+ $JOBID = undef;
+ %JOB = ();
+ @JOBARGS = ();
+ %JOBENV = ();
+ foreach $line (@lines) {
+ $line =~ s/\\n/\n/;
+ $line =~ s/\\\\/\\/;
+ my @pair = split(/=/, $line, 2);
+ if ($pair[0] eq "arg") {
+ push @JOBARGS, $pair[1];
+ }
+ elsif ($pair[0] eq "env") {
+ my @ep = split(/=/, $pair[1], 2);
+ $JOBENV{"$ep[0]"} = $ep[1];
+ }
+ elsif ($pair[0] eq "identity") {
+ $JOBID = $pair[1];
+ }
+ else {
+ $JOB{$pair[0]} = $pair[1];
+ }
+ }
+ if (checkJob($tag)) {
+ sendCmd((\&nullCB, "JOBSTATUS", $JOBID, "$ACTIVE", "0", ""));
+ forkjob();
+ }
+}
+
+sub checkJob() {
+ my $tag = shift;
+ my $executable = $JOB{"executable"};
+ if (!(defined $JOBID)) {
+ sendReply($tag, ("Missing job identity"));
+ return 0;
+ }
+ elsif (!(defined $executable)) {
+ sendReply($tag, ("Missing executable"));
+ return 0;
+ }
+ else {
+ sendReply($tag, ("OK"));
+ return 1;
+ }
+}
+
+sub forkjob {
+ my ($pid, $status);
+ $pid = fork();
+ if (defined($pid)) {
+ if ($pid == 0) {
+ runjob();
+ }
+ else {
+ wlog "Forked process $pid. Waiting for its completion\n";
+ waitpid($pid, 0);
+ $status = $?;
+ wlog "Child process $pid terminated. Status is $status. $!\n";
+ queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", "");
+ }
+ }
+ else {
+ queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "512", "Could not fork child process");
+ }
+}
+
+sub runjob {
+ my $executable = $JOB{"executable"};
+ my $stdout = $JOB{"stdout"};
+ my $stderr = $JOB{"stderr"};
+
+ wlog "Running $executable\n";
+ my $ename;
+ foreach $ename (keys %JOBENV) {
+ $ENV{$ename} = $JOBENV{$ename};
+ }
+ unshift @JOBARGS, $executable;
+ if (defined $stdout) {
+ close STDOUT;
+ open STDOUT, $stdout;
+ }
+ if (defined $stderr) {
+ close STDERR;
+ open STDERR, $stderr;
+ }
+ exec { $executable } @JOBARGS or queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "513", "Could not execute $executable: $!");
+ die "Could not execute $executable: $!";
+}
+
+my $MSG="0";
+
+my $myhost=`hostname -i`;
+$myhost =~ s/\s+$//;
+init();
+
+queueCmd(\®isterCB, "REGISTER", $ID, "wid://$ID");
+
+mainloop();
Property changes on: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,25 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jan 19, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.abstraction.impl.execution.coaster.SubmitJobCommand;
+import org.globus.cog.karajan.workflow.service.AbstractRequestManager;
+import org.globus.cog.karajan.workflow.service.handlers.ChannelConfigurationHandler;
+import org.globus.cog.karajan.workflow.service.handlers.ShutdownHandler;
+import org.globus.cog.karajan.workflow.service.handlers.VersionHandler;
+
+public class CoasterRequestManager extends AbstractRequestManager {
+ public CoasterRequestManager() {
+ addHandler("VERSION", VersionHandler.class);
+ addHandler("CHANNELCONFIG", ChannelConfigurationHandler.class);
+ addHandler("SHUTDOWN", ShutdownHandler.class);
+ addHandler(SubmitJobCommand.NAME, SubmitJobHandler.class);
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,139 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jan 19, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.job.manager.JobQueue;
+import org.globus.cog.abstraction.coaster.service.local.JobStatusHandler;
+import org.globus.cog.abstraction.coaster.service.local.RegistrationHandler;
+import org.globus.cog.karajan.workflow.service.ConnectionHandler;
+import org.globus.cog.karajan.workflow.service.GSSService;
+import org.globus.cog.karajan.workflow.service.RequestManager;
+import org.globus.cog.karajan.workflow.service.ServiceRequestManager;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.globus.gsi.gssapi.auth.SelfAuthorization;
+
+public class CoasterService extends GSSService {
+ public static final Logger logger = Logger
+ .getLogger(CoasterService.class);
+
+ private String registrationURL, id;
+ private JobQueue jobQueue;
+ private LocalTCPService localService;
+ private Exception e;
+ private boolean done;
+
+ public CoasterService() throws IOException {
+ this(null, null);
+ }
+
+ public CoasterService(String registrationURL, String id)
+ throws IOException {
+ super();
+ this.registrationURL = registrationURL;
+ this.id = id;
+ setAuthorization(new SelfAuthorization());
+ RequestManager rm = new ServiceRequestManager();
+ rm.addHandler("REGISTER", RegistrationHandler.class);
+ rm.addHandler("JOBSTATUS", JobStatusHandler.class);
+ localService = new LocalTCPService(rm);
+ }
+
+ protected void handleConnection(Socket sock) {
+ logger.debug("Got connection");
+ try {
+ ConnectionHandler handler = new ConnectionHandler(this, sock,
+ new CoasterRequestManager());
+ handler.start();
+ }
+ catch (Exception e) {
+ logger.warn("Could not start connection handler", e);
+ }
+ }
+
+ public void start() {
+ super.start();
+ try {
+ localService.start();
+ jobQueue = new JobQueue(localService);
+ jobQueue.start();
+ localService.setWorkerManager(jobQueue.getWorkerManager());
+ logger.info("Started local service: " + localService.getContact());
+ if (id != null) {
+ try {
+ logger.info("Reserving channel for registration");
+ KarajanChannel channel = ChannelManager.getManager()
+ .reserveChannel(registrationURL, null);
+ logger.info("Sending registration");
+ RegistrationCommand reg = new RegistrationCommand(id,
+ "https://" + getHost() + ":" + getPort());
+ reg.execute(channel);
+ logger.info("Registration complete");
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to register service",
+ e);
+ }
+ }
+ logger.info("Started coaster service: " + this);
+ }
+ catch (Exception e) {
+ logger.error("Failed to start coaster service", e);
+ stop(e);
+ }
+ }
+
+ private void stop(Exception e) {
+ synchronized(this) {
+ this.e = e;
+ done = true;
+ notifyAll();
+ }
+ }
+
+ public void waitFor() throws Exception {
+ synchronized(this) {
+ while (!done) {
+ wait();
+ }
+ if (e != null) {
+ throw e;
+ }
+ }
+ }
+
+ public JobQueue getJobQueue() {
+ return jobQueue;
+ }
+
+ public static void main(String[] args) {
+ try {
+ CoasterService s;
+ if (args.length < 2) {
+ s = new CoasterService();
+ }
+ else {
+ s = new CoasterService(args[0], args[1]);
+ }
+ s.start();
+ //JobSubmissionTaskHandler.main(new String[0]);
+ s.waitFor();
+ System.exit(0);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/JobStatusCommand.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,66 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 12, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import org.globus.cog.abstraction.impl.common.execution.JobException;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+public class JobStatusCommand extends Command {
+ public static final String NAME = "JOBSTATUS";
+
+ private String taskId;
+ private Status status;
+
+ public JobStatusCommand(String taskId, Status status) {
+ super(NAME);
+ this.taskId = taskId;
+ this.status = status;
+ }
+
+
+ public void send() throws ProtocolException {
+ try {
+ serialize();
+ }
+ catch (Exception e) {
+ throw new ProtocolException(
+ "Could not serialize status", e);
+ }
+ super.send();
+ }
+
+ protected void serialize() throws IOException {
+ addOutData(taskId);
+ addOutData(String.valueOf(status.getStatusCode()));
+ if (status.getException() instanceof JobException) {
+ addOutData(String.valueOf(((JobException) status.getException()).getExitCode()));
+ }
+ else {
+ addOutData("0");
+ }
+ StringBuffer sb = new StringBuffer();
+ if (status.getMessage() != null) {
+ sb.append(status.getMessage());
+ }
+ if (status.getException() != null) {
+ StringWriter sw = new StringWriter();
+ status.getException().printStackTrace(new PrintWriter(sw));
+ sb.append('\n');
+ sb.append(sw.toString());
+ }
+ addOutData(sb.toString());
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,67 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 15, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.job.manager.WorkerManager;
+import org.globus.cog.karajan.workflow.service.GSSService;
+import org.globus.cog.karajan.workflow.service.RequestManager;
+import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelException;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+
+public class LocalTCPService extends GSSService implements Registering {
+ public static final Logger logger = Logger.getLogger(LocalTCPService.class);
+
+ private WorkerManager workerManager;
+
+ public LocalTCPService(RequestManager rm) throws IOException {
+ super(false, 0);
+ setRequestManager(rm);
+ }
+
+ public void registrationReceived(String id, String url, KarajanChannel channel) throws ChannelException {
+ if (logger.isInfoEnabled()) {
+ logger.info("Received registration: id = " + id + ", url = " + url);
+ }
+ ChannelContext cc = channel.getChannelContext();
+ cc.getChannelID().setLocalID("coaster");
+ cc.getChannelID().setRemoteID(id);
+ ChannelManager.getManager().registerChannel(cc.getChannelID(), channel);
+ workerManager.registrationReceived(id, url, channel.getChannelContext());
+ }
+
+ public WorkerManager getWorkerManager() {
+ return workerManager;
+ }
+
+ public void setWorkerManager(WorkerManager workerManager) {
+ this.workerManager = workerManager;
+ }
+
+ protected void handleConnection(Socket socket) {
+ try {
+ socket.setReceiveBufferSize(2048);
+ socket.setSendBufferSize(4096);
+ socket.setTcpNoDelay(true);
+ }
+ catch (SocketException e) {
+ e.printStackTrace();
+ }
+ super.handleConnection(socket);
+ }
+
+
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/Registering.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/Registering.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/Registering.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,17 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 15, 2008
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.karajan.workflow.service.channels.ChannelException;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+
+public interface Registering {
+ void registrationReceived(String id, String url, KarajanChannel channel) throws ChannelException;
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/RegistrationCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/RegistrationCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/RegistrationCommand.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,22 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.abstraction.coaster.service.local.RegistrationHandler;
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+
+public class RegistrationCommand extends Command {
+ public RegistrationCommand(String id, String url) {
+ super(RegistrationHandler.NAME);
+ addOutData(id);
+ addOutData(url);
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/SubmitJobHandler.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,186 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.globus.cog.abstraction.coaster.service.job.manager.TaskNotifier;
+import org.globus.cog.abstraction.impl.common.IdentityImpl;
+import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
+import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
+import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
+import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.abstraction.interfaces.ExecutionService;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+
+public class SubmitJobHandler extends RequestHandler {
+ public void requestComplete() throws ProtocolException {
+ Task t;
+ try {
+ t = read(getInData(0));
+ ChannelContext channelContext = getChannel().getChannelContext();
+ new TaskNotifier(t, channelContext);
+ ((CoasterService) channelContext.getService()).getJobQueue()
+ .enqueue(t);
+ // make sure we'll have something to send notifications to
+ ChannelManager.getManager().reserveLongTerm(getChannel());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw new ProtocolException(
+ "Could not deserialize job description", e);
+ }
+ sendReply(t.getIdentity().toString());
+ }
+
+ private Task read(byte[] buf) throws IOException, ProtocolException,
+ IllegalSpecException {
+ Helper h = new Helper(buf);
+
+ Task t = new TaskImpl();
+ t.setType(Task.JOB_SUBMISSION);
+ JobSpecification spec = new JobSpecificationImpl();
+ t.setSpecification(spec);
+
+ String clientId = h.read("identity");
+ t.setIdentity(new IdentityImpl(clientId + "-"
+ + new IdentityImpl().getValue()));
+ spec.setExecutable(h.read("executable").intern());
+ spec.setDirectory(h.read("directory"));
+ spec.setStdInput(h.read("stdin"));
+ spec.setStdOutput(h.read("stdout"));
+ spec.setStdError(h.read("stderr"));
+ String s;
+ while ((s = h.read("arg")) != null) {
+ spec.addArgument(s);
+ }
+
+ while ((s = h.read("env")) != null) {
+ spec.addEnvironmentVariable(getKey(s), getValue(s));
+ }
+
+ while ((s = h.read("attr")) != null) {
+ spec.setAttribute(getKey(s), getValue(s));
+ }
+
+ ExecutionService service = new ExecutionServiceImpl();
+
+ setServiceParams(service, h.read("contact"), h.read("provider"), h
+ .read("jm").intern());
+ t.setService(0, service);
+
+ return t;
+ }
+
+ protected void setServiceParams(ExecutionService s, String contact,
+ String provider, String jm) throws IllegalSpecException {
+ if (jm == null) {
+ jm = "fork";
+ }
+
+ if (jm.equalsIgnoreCase("fork")) {
+ s.setProvider("local");
+ }
+ else {
+ s.setProvider("coaster");
+ s.setJobManager(jm);
+ }
+ s.setServiceContact(new ServiceContactImpl(contact));
+ }
+
+ private String getKey(String s) throws ProtocolException {
+ int i = s.indexOf('=');
+ if (i == -1) {
+ throw new ProtocolException("Invalid value: " + s);
+ }
+ return s.substring(0, i);
+ }
+
+ private String getValue(String s) {
+ int i = s.indexOf('=');
+ return s.substring(i + 1);
+ }
+
+ private static class Helper {
+ private InputStream is;
+ private String key, value;
+
+ public Helper(byte[] buf) {
+ is = new ByteArrayInputStream(buf);
+ }
+
+ public String read(String key) throws IOException, ProtocolException {
+ if (this.key == null) {
+ scan();
+ }
+ if (key.equals(this.key)) {
+ this.key = null;
+ return this.value;
+ }
+ else {
+ return null;
+ }
+ }
+
+ private void scan() throws IOException, ProtocolException {
+ key = null;
+ value = null;
+ StringBuffer sb = new StringBuffer();
+ int c = is.read();
+ boolean nl = false;
+ while (value == null) {
+ switch (c) {
+ case '=': {
+ if (key == null) {
+ key = sb.toString();
+ sb = new StringBuffer();
+ }
+ else {
+ sb.append((char) c);
+ }
+ break;
+ }
+ case '\\': {
+ c = is.read();
+ if (c == 'n') {
+ sb.append('\n');
+ }
+ else if (c == '\\') {
+ sb.append('\\');
+ }
+ break;
+ }
+ case -1:
+ case '\n': {
+ if (key == null) {
+ throw new ProtocolException("Invalid line: "
+ + sb.toString());
+ }
+ else {
+ value = sb.toString();
+ }
+ return;
+ }
+ default:
+ sb.append((char) c);
+ }
+ c = is.read();
+ }
+ }
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/VersionCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/VersionCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/VersionCommand.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,24 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.abstraction.coaster.service.local.VersionHandler;
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+
+public class VersionCommand extends Command {
+ public VersionCommand() {
+ super(VersionHandler.NAME);
+ }
+
+ public String getServerVersion() {
+ return new String(getInData());
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,33 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 13, 2008
+ */
+package org.globus.cog.abstraction.coaster.service.job.manager;
+
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+
+public class AssociatedTask {
+ public final Task task;
+ public final WallTime maxWallTime;
+
+ public AssociatedTask(Task task) {
+ this.task = task;
+ this.maxWallTime = getMaxWallTime(task);
+ }
+
+ private WallTime getMaxWallTime(Task t) {
+ Object wt = ((JobSpecification) t.getSpecification()).getAttribute("maxwalltime");
+ if (wt == null) {
+ return new WallTime("10");
+ }
+ else {
+ return new WallTime(wt.toString());
+ }
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java 2008-04-22 14:05:35 UTC (rev 1967)
@@ -0,0 +1,67 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 13, 2008
+ */
+package org.globus.cog.abstraction.coaster.service.job.manager;
+
+import java.io.IOException;
+
+import org.globus.cog.abstraction.impl.common.StatusImpl;
+import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.TaskHandler;
+
+public class CoasterQueueProcessor extends QueueProcessor {
+ private TaskHandler taskHandler;
+ private WorkerManager workerManager;
+ private String workdir;
+
+ public CoasterQueueProcessor(WorkerManager workerManager)
+ throws IOException {
+ super("Coaster Queue Processor");
+ this.workerManager = workerManager;
+ this.taskHandler = new CoasterTaskHandler(workerManager);
+ }
+
+ public void run() {
+ try {
+ workerManager.start();
+ AssociatedTask at;
+ while (!this.getShutdownFlag()) {
+ at = next();
+ Worker wr = workerManager.request(at.maxWallTime, at.task);
+ if (wr != null) {
+ remove();
+ if (wr.getStatus() != null) {
+ at.task.setStatus(wr.getStatus());
+ }
+ else {
+ try {
+ at.task.getService(0).setServiceContact(
+ new ServiceContactImpl(wr.getId()));
+ new WorkerTaskMonitor(at.task, workerManager, wr);
+ taskHandler.submit(at.task);
+ }
+ catch (Exception e) {
+ at.task.setStatus(new StatusImpl(Status.FAILED,
+ null, e));
+ }
+ }
+ }
+ if (hasWrapped()) {
+ synchronized(workerManager) {
+ workerManager.wa...
[truncated message content] |
|
From: <ha...@us...> - 2008-04-22 14:00:20
|
Revision: 1966
http://cogkit.svn.sourceforge.net/cogkit/?rev=1966&view=rev
Author: hategan
Date: 2008-04-22 07:00:10 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
updated service communication library
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestManager.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceContext.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UserContext.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ReplyEvent.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TagTable.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/ChannelConfigurationCommand.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/ChannelConfigurationHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/EchoHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/EventHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/GroupHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/RequestHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/ShutdownHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/StartHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/TestHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/UnknownCommandHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/test/Services.java
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ChannelFactory.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelAttributes.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java
Removed Paths:
-------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractSocketChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSSocketChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/PlainSocketChannel.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -22,7 +22,7 @@
handlers = new Hashtable();
}
- protected void addHandler(String cmd, Class cls) {
+ public void addHandler(String cmd, Class cls) {
handlers.put(cmd, cls);
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ChannelFactory.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ChannelFactory.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ChannelFactory.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -0,0 +1,46 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 14, 2008
+ */
+package org.globus.cog.karajan.workflow.service;
+
+import java.net.URI;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelException;
+import org.globus.cog.karajan.workflow.service.channels.GSSChannel;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.globus.cog.karajan.workflow.service.channels.TCPChannel;
+import org.globus.cog.karajan.workflow.service.channels.UDPChannel;
+
+public class ChannelFactory {
+ public static final Logger logger = Logger.getLogger(ChannelFactory.class);
+
+ public static KarajanChannel newChannel(URI contact, ChannelContext context, RequestManager rm)
+ throws ChannelException {
+ KarajanChannel channel;
+ if (contact.getScheme() == null || contact.getScheme().equals("tcps")) {
+ channel = new GSSChannel(contact, rm, context);
+ }
+ else if (contact.getScheme().equals("https")) {
+ channel = new GSSChannel(contact, rm, context);
+ }
+ else if (contact.getScheme().equals("tcp")) {
+ channel = new TCPChannel(contact, context, rm);
+ }
+ else if (contact.getScheme().equals("udp")) {
+ channel = new UDPChannel(contact, context, rm);
+ }
+ else {
+ throw new IllegalArgumentException("Scheme not supported: " + contact);
+ }
+ channel.start();
+ return channel;
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -10,7 +10,6 @@
package org.globus.cog.karajan.workflow.service;
import java.io.IOException;
-import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Hashtable;
@@ -18,30 +17,16 @@
import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
-import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
-import org.globus.cog.karajan.workflow.service.channels.GSSSocketChannel;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand;
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.commands.VersionCommand;
-import org.globus.gsi.GSIConstants;
-import org.globus.gsi.gssapi.GSSConstants;
-import org.globus.gsi.gssapi.GlobusGSSManagerImpl;
-import org.globus.gsi.gssapi.auth.Authorization;
-import org.globus.gsi.gssapi.auth.HostAuthorization;
-import org.globus.gsi.gssapi.auth.SelfAuthorization;
-import org.globus.gsi.gssapi.net.GssSocket;
-import org.globus.gsi.gssapi.net.GssSocketFactory;
-import org.gridforum.jgss.ExtendedGSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSManager;
public class Client {
private static final Logger logger = Logger.getLogger(Client.class);
private final URI contact;
- private Socket socket;
- private GSSSocketChannel channel;
+ private KarajanChannel channel;
private RequestManager requestManager;
private ChannelContext sc;
private static Service callback;
@@ -75,7 +60,12 @@
}
public static Client newClient(String contact, ChannelContext context) throws Exception {
- Client client = new Client(contact);
+ return newClient(contact, context, new ClientRequestManager());
+ }
+
+ public static Client newClient(String contact, ChannelContext context,
+ RequestManager requestManager) throws Exception {
+ Client client = new Client(contact, requestManager);
client.setChannelContext(context);
context.getChannelID().setClient(true);
synchronized (client) {
@@ -89,7 +79,12 @@
}
public Client(String contact) throws URISyntaxException {
+ this(contact, new ClientRequestManager());
+ }
+
+ public Client(String contact, RequestManager requestManager) throws URISyntaxException {
this.contact = new URI(contact);
+ this.requestManager = requestManager;
}
public void connect() throws Exception {
@@ -117,44 +112,15 @@
if (port == -1) {
port = 1984;
}
- HostAuthorization hostAuthz = new HostAuthorization("host");
-
- Authorization authz = new FallbackAuthorization(new Authorization[] { hostAuthz,
- SelfAuthorization.getInstance() });
-
-
- GSSCredential cred = sc.getCredential();
-
- GSSManager manager = new GlobusGSSManagerImpl();
- ExtendedGSSContext context = (ExtendedGSSContext) manager.createContext(null,
- GSSConstants.MECH_OID, cred, 2 * 3600);
-
- context.requestAnonymity(false);
- context.requestCredDeleg(false);
- context.setOption(GSSConstants.GSS_MODE, GSIConstants.MODE_SSL);
- context.setOption(GSSConstants.DELEGATION_TYPE, GSIConstants.DELEGATION_TYPE_LIMITED);
-
- socket = GssSocketFactory.getDefault().createSocket(host, port, context);
- socket.setKeepAlive(true);
- socket.setSoTimeout(0);
- ((GssSocket) socket).setWrapMode(GSIConstants.MODE_SSL.intValue());
- ((GssSocket) socket).setAuthorization(authz);
- requestManager = new ClientRequestManager();
- logger.info("Connected to " + host + ":" + port);
-
- sc.setRemoteContact(contact.toString());
- channel = new GSSSocketChannel((GssSocket) socket, requestManager, sc, true,
- contact.toString());
- channel.start();
-
- String callbackURL = null;
+ channel = ChannelFactory.newChannel(contact, sc, requestManager);
+ URI callbackURI = null;
if (sc.getConfiguration().hasOption(RemoteConfiguration.CALLBACK)) {
- callbackURL = ChannelManager.getManager().getCallbackURL();
+ callbackURI = channel.getCallbackURI();
}
String remoteID = getChannel().getChannelContext().getChannelID().getRemoteID();
ChannelConfigurationCommand ccc = new ChannelConfigurationCommand(
- sc.getConfiguration(), callbackURL);
+ sc.getConfiguration(), callbackURI);
ccc.execute(this.getChannel());
connected = true;
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -12,29 +12,32 @@
import java.net.Socket;
import org.apache.log4j.Logger;
-import org.globus.cog.karajan.workflow.service.channels.AbstractSocketChannel;
+import org.globus.cog.karajan.workflow.service.channels.AbstractTCPChannel;
import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
-import org.globus.cog.karajan.workflow.service.channels.GSSSocketChannel;
-import org.globus.cog.karajan.workflow.service.channels.PlainSocketChannel;
+import org.globus.cog.karajan.workflow.service.channels.GSSChannel;
+import org.globus.cog.karajan.workflow.service.channels.TCPChannel;
import org.globus.gsi.gssapi.net.GssSocket;
public class ConnectionHandler {
private static final Logger logger = Logger.getLogger(ConnectionHandler.class);
private final Socket socket;
- private final AbstractSocketChannel channel;
+ private final AbstractTCPChannel channel;
private final RequestManager requestManager;
public ConnectionHandler(Service service, Socket socket) {
+ this(service, socket, null);
+ }
+
+ public ConnectionHandler(Service service, Socket socket, RequestManager requestManager) {
this.socket = socket;
- requestManager = new ServiceRequestManager();
+ this.requestManager = requestManager == null ? new ServiceRequestManager() : requestManager;
if (socket instanceof GssSocket) {
- channel = new GSSSocketChannel((GssSocket) socket, requestManager, new ChannelContext(
- service), false);
+ channel = new GSSChannel((GssSocket) socket, this.requestManager, new ChannelContext(
+ service));
}
else {
- channel = new PlainSocketChannel(socket, requestManager, new ChannelContext(service),
- false);
+ channel = new TCPChannel(socket, this.requestManager, new ChannelContext(service));
}
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -0,0 +1,273 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 18, 2005
+ */
+package org.globus.cog.karajan.workflow.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.URI;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.AbstractionFactory;
+import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
+import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.util.ArgumentParser;
+import org.globus.cog.util.ArgumentParserException;
+import org.globus.cog.util.GridMap;
+import org.globus.gsi.GlobusCredential;
+import org.globus.gsi.GlobusCredentialException;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.gssapi.auth.SelfAuthorization;
+import org.globus.net.BaseServer;
+import org.globus.net.ServerSocketFactory;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+
+public class GSSService extends BaseServer implements Service {
+ private static final Logger logger = Logger.getLogger(Service.class);
+ private ServiceContext context = new ServiceContext(this);
+ private Thread serverThread;
+ private boolean restricted;
+ private URI contact;
+ private RequestManager requestManager;
+
+ public GSSService() throws IOException {
+ super();
+ }
+
+ public GSSService(GSSCredential cred) throws IOException {
+ this(cred, 0);
+ }
+
+ public GSSService(boolean secure, int port) throws IOException {
+ super(secure, port);
+ }
+
+ public GSSService(GSSCredential cred, int port) throws IOException {
+ super(cred, port);
+ }
+
+ public GSSService(GSSCredential cred, int port, InetAddress bindTo) throws IOException {
+ super(cred, port);
+ if (bindTo != null) {
+ this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo);
+ }
+ }
+
+ public GSSService(int port) throws IOException {
+ this(true, port);
+ }
+
+ public GSSService(boolean secure, int port, InetAddress bindTo) throws IOException {
+ super(secure, port);
+ if (bindTo != null) {
+ this._server.close();
+ this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo);
+ }
+ }
+
+ protected void setRequestManager(RequestManager requestManager) {
+ this.requestManager = requestManager;
+ }
+
+ public void initialize() {
+ // prevent the server from being started by BaseServer constructors
+ }
+
+ public static GSSCredential initializeCredentials(boolean personal, String hostcert,
+ String hostkey) throws GlobusCredentialException, GSSException {
+ if (!personal) {
+ return new GlobusGSSCredentialImpl(new GlobusCredential(hostcert != null ? hostcert
+ : "/etc/grid-security/hostcert.pem", hostkey != null ? hostkey
+ : "/etc/grid-security/hostkey.pem"), GSSCredential.INITIATE_AND_ACCEPT);
+ }
+ else {
+ return new GlobusGSSCredentialImpl(GlobusCredential.getDefaultCredential(),
+ GSSCredential.INITIATE_AND_ACCEPT);
+ }
+ }
+
+ protected void handleConnection(Socket socket) {
+ logger.debug("Got connection");
+ try {
+ ConnectionHandler handler = new ConnectionHandler(this, socket, requestManager);
+ handler.start();
+ }
+ catch (Exception e) {
+ logger.warn("Could not start connection handler", e);
+ }
+ }
+
+ public URI getContact() {
+ return contact;
+ }
+
+ public void start() {
+ try {
+ contact = new URI(getProtocol(), null, getHost(), getPort(), null, null, null);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (serverThread == null) {
+ accept = true;
+ serverThread = new Thread(this);
+ serverThread.setName("Server: " + getContact());
+ serverThread.start();
+ }
+ }
+
+ public String toString() {
+ return String.valueOf(contact);
+ }
+
+ public boolean isRestricted() {
+ return restricted;
+ }
+
+ public void setRestricted(boolean restricted) {
+ this.restricted = restricted;
+ }
+
+ public ServiceContext getContext() {
+ return context;
+ }
+
+ public static void main(String[] args) {
+ ArgumentParser ap = new ArgumentParser();
+ ap.setExecutableName("cog-workflow-service");
+ ap.addFlag("personal", "Starts the service in personal mode. In personal mode, "
+ + "the service uses the user credential, and does not use a restricted "
+ + "execution environment. This is the default mode.");
+ ap.addFlag("shared", "Starts the service in shared mode. In shared mode, "
+ + "the service uses the host credentials, provides a restricted execution "
+ + "environment, and uses the grid-map file for authorization.");
+ ap.addFlag("nosec", "Starts the service without security");
+ ap.addFlag("local",
+ "Binds the socket to the local host. Connections from the network will not be possible.");
+ ap.addOption("port",
+ "Specifies the port that the service should be started on. The default is 1984.",
+ "port-number", ArgumentParser.OPTIONAL);
+ ap.addOption(
+ "gridmap",
+ "Specifies the location of the grid map file, which is used for "
+ + "mapping certificate distinguished names to local user accounts. This options is "
+ + "only meaningful if the service is started in shared mode. The default is "
+ + GridMap.DEFAULT_GRID_MAP, "file", ArgumentParser.OPTIONAL);
+ ap.addOption("hostcert", "Indicates the location of the host certificate. This option "
+ + "is only used in shared mode. The default is /etc/grid-security/hostcert.pem",
+ "file", ArgumentParser.OPTIONAL);
+ ap.addOption("hostkey", "Indicates the location of the host key. This option "
+ + "is only used in shared mode. The default is /etc/grid-security/hostkey.pem",
+ "file", ArgumentParser.OPTIONAL);
+ ap.addFlag("help", "Displays usage information");
+ ap.addAlias("port", "p");
+ try {
+ ap.parse(args);
+ }
+ catch (ArgumentParserException e) {
+ ap.usage();
+ System.exit(1);
+ }
+ if (ap.isPresent("help")) {
+ ap.usage();
+ System.exit(1);
+ }
+
+ int port = 1984;
+ if (ap.isPresent("port")) {
+ port = ap.getIntValue("port");
+ }
+ boolean personal = true;
+ String hostcert = null;
+ String hostkey = null;
+ boolean bind = false;
+ boolean nosec = false;
+ if (ap.isPresent("shared")) {
+ if (ap.isPresent("gridmap")) {
+ System.setProperty("grid.mapfile", ap.getStringValue("gridmap"));
+ }
+ personal = false;
+ if (ap.isPresent("personal")) {
+ System.err.println("-shared and -personal are mutually exclusive");
+ System.exit(1);
+ }
+ if (ap.isPresent("hostcert")) {
+ hostcert = ap.getStringValue("hostcert");
+ }
+ if (ap.isPresent("hostkey")) {
+ hostkey = ap.getStringValue("hostkey");
+ }
+ Task task = new TaskImpl();
+ JobSpecification spec = new JobSpecificationImpl();
+ task.setType(Task.JOB_SUBMISSION);
+ spec.setExecutable("test");
+ task.setSpecification(spec);
+ try {
+ TaskHandler handler = AbstractionFactory.newExecutionTaskHandler("local");
+ handler.submit(task);
+ throw new Exception();
+ }
+ catch (InvalidSecurityContextException e) {
+ // This means that it's the gridmapped local provider
+ }
+ catch (Exception e) {
+ System.err.println("The service will not run in shared mode with the default local provider.");
+ System.err.println("Please install the grid-mapped local provider");
+ System.exit(2);
+ }
+ }
+ if (ap.isPresent("nosec")) {
+ nosec = true;
+ }
+ if (ap.isPresent("local")) {
+ bind = true;
+ }
+
+ try {
+ GSSService service;
+ InetAddress bindTo = null;
+ if (bind) {
+ bindTo = InetAddress.getLocalHost();
+ }
+ if (nosec) {
+ service = new GSSService(false, port, bindTo);
+ }
+ else {
+ service = new GSSService(initializeCredentials(personal, hostcert, hostkey), port,
+ bindTo);
+ }
+ if (personal) {
+ service.setAuthorization(new SelfAuthorization());
+ service.setRestricted(false);
+ }
+ else {
+ service.setAuthorization(new GridMapAuthorization());
+ service.setRestricted(true);
+ }
+ if (bind) {
+ service.getContext().setLocal(true);
+ }
+ service.start();
+ System.out.println("Service started on port " + service.getPort());
+ while (true) {
+ Thread.sleep(10000);
+ }
+ }
+ catch (Exception e) {
+ System.err.println("Could not start service:\n\t" + e.getMessage());
+ System.exit(1);
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestManager.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestManager.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestManager.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -14,4 +14,6 @@
public interface RequestManager {
RequestHandler handleInitialRequest(byte[] data)
throws NoSuchHandlerException;
+
+ void addHandler(String name, Class cls);
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -72,6 +72,15 @@
this.addOutData(str.getBytes());
}
+ protected void addOutData(int value) {
+ byte[] b = new byte[4];
+ b[0] = (byte) (value & 0xff);
+ b[1] = (byte) ((value >> 8) & 0xff);
+ b[2] = (byte) ((value >> 16) & 0xff);
+ b[3] = (byte) ((value >> 24) & 0xff);
+ addOutData(b);
+ }
+
protected void sendError(String error) throws ProtocolException {
sendError(error, null);
}
@@ -98,8 +107,10 @@
public abstract void send() throws ProtocolException;
- protected abstract void dataReceived(byte[] data) throws ProtocolException;
+ protected void dataReceived(byte[] data) throws ProtocolException {
+ }
+
protected synchronized void addInData(byte[] data) {
synchronized (this) {
if (inData == null) {
@@ -146,9 +157,18 @@
if (inData == null) {
return null;
}
- return (byte[]) inData.get(index);
+ try {
+ return (byte[]) inData.get(index);
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new IllegalArgumentException("Missing command argument #" + (index + 1));
+ }
}
+ public String getInDataAsString(int index) {
+ return new String(getInData(index));
+ }
+
public synchronized void setInData(int index, byte[] data) {
if (inData == null) {
inData = new LinkedList();
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -5,252 +5,18 @@
//----------------------------------------------------------------------
/*
- * Created on Jul 18, 2005
+ * Created on Feb 14, 2008
*/
package org.globus.cog.karajan.workflow.service;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
+import java.net.URI;
-import org.apache.log4j.Logger;
-import org.globus.cog.abstraction.impl.common.AbstractionFactory;
-import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
-import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
-import org.globus.cog.abstraction.impl.common.task.TaskImpl;
-import org.globus.cog.abstraction.interfaces.JobSpecification;
-import org.globus.cog.abstraction.interfaces.Task;
-import org.globus.cog.abstraction.interfaces.TaskHandler;
-import org.globus.cog.util.ArgumentParser;
-import org.globus.cog.util.ArgumentParserException;
-import org.globus.cog.util.GridMap;
-import org.globus.gsi.GlobusCredential;
-import org.globus.gsi.GlobusCredentialException;
-import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
-import org.globus.gsi.gssapi.auth.SelfAuthorization;
-import org.globus.net.BaseServer;
-import org.globus.net.ServerSocketFactory;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
+public interface Service {
-public class Service extends BaseServer {
- private static final Logger logger = Logger.getLogger(Service.class);
- private ServiceContext context = new ServiceContext(this);
- private Thread serverThread;
- private boolean restricted;
+ boolean isRestricted();
- public Service(GSSCredential cred) throws IOException {
- this(cred, 0);
- }
+ URI getContact();
- public Service(boolean secure, int port) throws IOException {
- super(secure, port);
- super.initialize();
- }
+ ServiceContext getContext();
- public Service(GSSCredential cred, int port) throws IOException {
- super(cred, port);
- super.initialize();
- }
-
- public Service(GSSCredential cred, int port, InetAddress bindTo) throws IOException {
- super(cred, port);
- if (bindTo != null) {
- this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo);
- }
- super.initialize();
- }
-
- public Service(int port) throws IOException {
- this(true, port);
- }
-
- public Service(boolean secure, int port, InetAddress bindTo) throws IOException {
- super(secure, port);
- if (bindTo != null) {
- this._server.close();
- this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo);
- }
- super.initialize();
- }
-
- public void initialize() {
- // prevent the server from being started by BaseServer constructors
- }
-
- public static GSSCredential initializeCredentials(boolean personal, String hostcert,
- String hostkey) throws GlobusCredentialException, GSSException {
- if (!personal) {
- return new GlobusGSSCredentialImpl(new GlobusCredential(hostcert != null ? hostcert
- : "/etc/grid-security/hostcert.pem", hostkey != null ? hostkey
- : "/etc/grid-security/hostkey.pem"), GSSCredential.INITIATE_AND_ACCEPT);
- }
- else {
- return new GlobusGSSCredentialImpl(GlobusCredential.getDefaultCredential(),
- GSSCredential.INITIATE_AND_ACCEPT);
- }
- }
-
- protected void handleConnection(Socket socket) {
- logger.debug("Got connection");
- try {
- ConnectionHandler handler = new ConnectionHandler(this, socket);
- handler.start();
- }
- catch (Exception e) {
- logger.warn("Could not start connection handler", e);
- }
- }
-
- public String getContactString() {
- return "https://" + getHost() + ":" + getPort();
- }
-
- protected void start() {
- if (serverThread == null) {
- accept = true;
- serverThread = new Thread(this);
- serverThread.setName("Server: " + getContactString());
- serverThread.start();
- }
- }
-
- public boolean isRestricted() {
- return restricted;
- }
-
- public void setRestricted(boolean restricted) {
- this.restricted = restricted;
- }
-
- public ServiceContext getContext() {
- return context;
- }
-
- public static void main(String[] args) {
- ArgumentParser ap = new ArgumentParser();
- ap.setExecutableName("cog-workflow-service");
- ap.addFlag("personal", "Starts the service in personal mode. In personal mode, "
- + "the service uses the user credential, and does not use a restricted "
- + "execution environment. This is the default mode.");
- ap.addFlag("shared", "Starts the service in shared mode. In shared mode, "
- + "the service uses the host credentials, provides a restricted execution "
- + "environment, and uses the grid-map file for authorization.");
- ap.addFlag("nosec", "Starts the service without security");
- ap.addFlag("local",
- "Binds the socket to the local host. Connections from the network will not be possible.");
- ap.addOption("port",
- "Specifies the port that the service should be started on. The default is 1984.",
- "port-number", ArgumentParser.OPTIONAL);
- ap.addOption(
- "gridmap",
- "Specifies the location of the grid map file, which is used for "
- + "mapping certificate distinguished names to local user accounts. This options is "
- + "only meaningful if the service is started in shared mode. The default is "
- + GridMap.DEFAULT_GRID_MAP, "file", ArgumentParser.OPTIONAL);
- ap.addOption("hostcert", "Indicates the location of the host certificate. This option "
- + "is only used in shared mode. The default is /etc/grid-security/hostcert.pem",
- "file", ArgumentParser.OPTIONAL);
- ap.addOption("hostkey", "Indicates the location of the host key. This option "
- + "is only used in shared mode. The default is /etc/grid-security/hostkey.pem",
- "file", ArgumentParser.OPTIONAL);
- ap.addFlag("help", "Displays usage information");
- ap.addAlias("port", "p");
- try {
- ap.parse(args);
- }
- catch (ArgumentParserException e) {
- ap.usage();
- System.exit(1);
- }
- if (ap.isPresent("help")) {
- ap.usage();
- System.exit(1);
- }
-
- int port = 1984;
- if (ap.isPresent("port")) {
- port = ap.getIntValue("port");
- }
- boolean personal = true;
- String hostcert = null;
- String hostkey = null;
- boolean bind = false;
- boolean nosec = false;
- if (ap.isPresent("shared")) {
- if (ap.isPresent("gridmap")) {
- System.setProperty("grid.mapfile", ap.getStringValue("gridmap"));
- }
- personal = false;
- if (ap.isPresent("personal")) {
- System.err.println("-shared and -personal are mutually exclusive");
- System.exit(1);
- }
- if (ap.isPresent("hostcert")) {
- hostcert = ap.getStringValue("hostcert");
- }
- if (ap.isPresent("hostkey")) {
- hostkey = ap.getStringValue("hostkey");
- }
- Task task = new TaskImpl();
- JobSpecification spec = new JobSpecificationImpl();
- task.setType(Task.JOB_SUBMISSION);
- spec.setExecutable("test");
- task.setSpecification(spec);
- try {
- TaskHandler handler = AbstractionFactory.newExecutionTaskHandler("local");
- handler.submit(task);
- throw new Exception();
- }
- catch (InvalidSecurityContextException e) {
- // This means that it's the gridmapped local provider
- }
- catch (Exception e) {
- System.err.println("The service will not run in shared mode with the default local provider.");
- System.err.println("Please install the grid-mapped local provider");
- System.exit(2);
- }
- }
- if (ap.isPresent("nosec")) {
- nosec = true;
- }
- if (ap.isPresent("local")) {
- bind = true;
- }
-
- try {
- Service service;
- InetAddress bindTo = null;
- if (bind) {
- bindTo = InetAddress.getLocalHost();
- }
- if (nosec) {
- service = new Service(false, port, bindTo);
- }
- else {
- service = new Service(initializeCredentials(personal, hostcert, hostkey), port,
- bindTo);
- }
- if (personal) {
- service.setAuthorization(new SelfAuthorization());
- service.setRestricted(false);
- }
- else {
- service.setAuthorization(new GridMapAuthorization());
- service.setRestricted(true);
- }
- if (bind) {
- service.getContext().setLocal(true);
- }
- service.start();
- System.out.println("Service started on port " + service.getPort());
- while (true) {
- Thread.sleep(10000);
- }
- }
- catch (Exception e) {
- System.err.println("Could not start service:\n\t" + e.getMessage());
- System.exit(1);
- }
- }
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceContext.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceContext.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceContext.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -38,6 +38,7 @@
}
public UserContext getUserContext(String name, ChannelContext channelContext) {
+ //TODO this doesn't make much sense
synchronized(users) {
String sname = String.valueOf(name);
UserContext uc = (UserContext) users.get(sname);
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -0,0 +1,137 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 18, 2005
+ */
+package org.globus.cog.karajan.workflow.service;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelException;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.channels.UDPChannel;
+
+public class UDPService implements Service, Runnable {
+ private static final Logger logger = Logger.getLogger(Service.class);
+
+ public static final int BUFFER_SIZE = 2048;
+
+ private ServiceContext context = new ServiceContext(this);
+ private Thread serverThread;
+ private boolean restricted;
+ private Map channels;
+ private DatagramSocket socket;
+ private RequestManager rm;
+ private byte[] recvbuf;
+ private boolean shutdownFlag;
+ private URI contact;
+
+ public UDPService(RequestManager rm) throws IOException {
+ this.rm = rm;
+ channels = new HashMap();
+ }
+
+ public URI getContact() {
+ return contact;
+ }
+
+ public void start() throws ChannelException {
+ recvbuf = new byte[BUFFER_SIZE];
+ try {
+ socket = new DatagramSocket();
+ InetSocketAddress addr = (InetSocketAddress) socket.getLocalSocketAddress();
+ contact = new URI("udp", null, InetAddress.getLocalHost().getHostAddress(),
+ addr.getPort(), null, null, null);
+
+ }
+ catch (Exception e) {
+ throw new ChannelException(e);
+ }
+ serverThread = new Thread(this);
+ serverThread.setName("UDP Service");
+ serverThread.start();
+ }
+
+ public void run() {
+ DatagramPacket dp = new DatagramPacket(recvbuf, recvbuf.length);
+ try {
+ while (!shutdownFlag) {
+ socket.receive(dp);
+ InetSocketAddress addr = (InetSocketAddress) dp.getSocketAddress();
+ UDPChannel channel;
+ synchronized (channels) {
+ channel = (UDPChannel) channels.get(addr);
+ if (channel == null) {
+ ChannelContext cc = new ChannelContext(context);
+ channel = new UDPChannel(socket, cc, rm, this, addr);
+ channels.put(addr, channel);
+ ChannelManager.getManager().registerChannel(
+ "udp://" + addr.getAddress().getHostAddress() + ":" + (addr.getPort() + 1), null, channel);
+ }
+ }
+ byte[] buf = new byte[dp.getLength()];
+ System.arraycopy(recvbuf, 0, buf, 0, dp.getLength());
+ try {
+ channel.dataReceived(buf);
+ }
+ catch (ChannelException e) {
+ logger.warn("Channel failed to process incoming message", e);
+ synchronized (channels) {
+ channels.remove(addr);
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public String toString() {
+ return String.valueOf(getContact());
+ }
+
+ public boolean isRestricted() {
+ return restricted;
+ }
+
+ public void setRestricted(boolean restricted) {
+ this.restricted = restricted;
+ }
+
+ public ServiceContext getContext() {
+ return context;
+ }
+
+ public void channelShutDown(UDPChannel channel) {
+ synchronized (channels) {
+ channels.remove(channel.getRemoteAddress());
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ RequestManager rm = new ServiceRequestManager();
+ UDPService s = new UDPService(new ServiceRequestManager());
+ s.start();
+ System.err.println(s.getContact());
+ Thread.sleep(100000000);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UserContext.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UserContext.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UserContext.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -75,6 +75,9 @@
}
}
+ /**
+ * Returns the channel context of the channel that created this user context
+ */
public ChannelContext getChannelContext() {
return channelContext;
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -12,13 +12,15 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
+import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;
+import org.globus.cog.karajan.workflow.service.NoSuchHandlerException;
import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.RequestManager;
+import org.globus.cog.karajan.workflow.service.Service;
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
@@ -30,6 +32,8 @@
private final RequestManager requestManager;
private final List registeredMaps;
private boolean localShutdown, closed;
+ private String name;
+ private Service callbackService;
protected AbstractKarajanChannel(RequestManager requestManager, ChannelContext channelContext) {
if (channelContext != null) {
@@ -66,7 +70,7 @@
public void sendTaggedData(int tag, boolean fin, byte[] data) {
if (logger.isDebugEnabled()) {
- logger.debug(this + "REQ>: tag = " + tag + ", fin = " + fin + ", datalen = "
+ logger.debug(this + " REQ>: tag = " + tag + ", fin = " + fin + ", datalen = "
+ data.length + ", data = " + ppByteBuf(data));
}
sendTaggedData(tag, fin ? FINAL_FLAG : 0, data);
@@ -74,7 +78,7 @@
public void sendTaggedReply(int tag, byte[] data, boolean fin, boolean err) {
if (logger.isDebugEnabled()) {
- logger.debug(this + "REPL>: tag = " + tag + ", fin = " + fin + ", datalen = "
+ logger.debug(this + " REPL>: tag = " + tag + ", fin = " + fin + ", datalen = "
+ data.length + ", data = " + ppByteBuf(data));
}
int flags = REPLY_FLAG;
@@ -90,18 +94,10 @@
public ChannelContext getChannelContext() {
return context;
}
-
+
public void setChannelContext(ChannelContext context) {
this.context = context;
}
-
- protected void readFromStream(InputStream stream, ByteBuffer buf) throws IOException {
- int count = stream.read(buf.array());
- if (count == -1) {
- throw new EOFException("Connection closed");
- }
- buf.position(buf.position() + count);
- }
protected int readFromStream(InputStream stream, byte[] buf, int crt) throws IOException {
int count = stream.read(buf, crt, buf.length - crt);
@@ -111,6 +107,22 @@
return crt + count;
}
+ public static void pack(byte[] buf, int offset, int value) {
+ buf[offset] = (byte) (value & 0xff);
+ buf[offset + 1] = (byte) ((value >> 8) & 0xff);
+ buf[offset + 2] = (byte) ((value >> 16) & 0xff);
+ buf[offset + 3] = (byte) ((value >> 24) & 0xff);
+ }
+
+ public static int unpack(byte[] buf, int offset) {
+ int i = 0;
+ i += (buf[offset] & 0xff);
+ i += (buf[offset + 1] & 0xff) << 8;
+ i += (buf[offset + 2] & 0xff) << 16;
+ i += (buf[offset + 3] & 0xff) << 24;
+ return i;
+ }
+
public static String ppByteBuf(byte[] data) {
byte[] buf = new byte[Math.min(data.length, 256)];
for (int i = 0; i < buf.length; i++) {
@@ -156,7 +168,7 @@
public void close() {
closed = true;
}
-
+
public boolean isClosed() {
return closed;
}
@@ -172,4 +184,125 @@
public boolean isClient() {
return false;
}
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String toString() {
+ return getName();
+ }
+
+ public synchronized URI getCallbackURI() throws Exception {
+ ensureCallbackServiceStarted();
+ return getCallbackService().getContact();
+ }
+
+ public Service getCallbackService() {
+ return callbackService;
+ }
+
+ public void setCallbackService(Service callbackService) {
+ this.callbackService = callbackService;
+ }
+
+ protected void ensureCallbackServiceStarted() throws Exception {
+
+ }
+
+ protected void handleReply(int tag, boolean fin, boolean error, int len, byte[] data) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(this + " REPL<: tag = " + tag + ", fin = " + fin
+ + ", err = " + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
+ }
+ Command cmd = getChannelContext().getRegisteredCommand(tag);
+ if (cmd != null) {
+ try {
+ cmd.replyReceived(data);
+ if (fin) {
+ if (error) {
+ cmd.errorReceived();
+ }
+ else {
+ cmd.receiveCompleted();
+ }
+ unregisterCommand(cmd);
+ }
+ }
+ catch (ProtocolException e) {
+ logger.warn("Exception caught while processing reply", e);
+ cmd.errorReceived(e.getMessage(), e);
+ }
+ }
+ else {
+ unregisteredSender(tag);
+ }
+ }
+
+ protected void unregisteredSender(int tag) {
+ logger.warn(getName() + " Recieved reply to unregistered sender. Tag: " + tag);
+ }
+
+ protected void handleRequest(int tag, boolean fin, boolean error, int len, byte[] data) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(this + " REQ<: tag = " + tag + ", fin = " + fin
+ + ", err = " + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
+ }
+ RequestHandler handler = getChannelContext().getRegisteredHandler(tag);
+ try {
+ if (handler != null) {
+ handler.register(this);
+ handler.dataReceived(data);
+ }
+ else {
+ try {
+ handler = getRequestManager().handleInitialRequest(data);
+ handler.setId(tag);
+ registerHandler(handler, tag);
+ }
+ catch (NoSuchHandlerException e) {
+ logger.warn(getName() + "Could not handle request", e);
+ }
+
+ }
+ if (fin) {
+ try {
+ if (error) {
+ // TODO
+ }
+ else {
+ handler.receiveCompleted();
+ }
+ }
+ catch (ChannelIOException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to process request", e);
+ }
+ if (!handler.isReplySent()) {
+ handler.sendError(e.toString(), e);
+ }
+ }
+ catch (Error e) {
+ if (!handler.isReplySent()) {
+ handler.sendError(e.toString(), e);
+ }
+ throw e;
+ }
+ finally {
+ unregisterHandler(tag);
+ }
+ }
+ }
+ catch (ProtocolException e) {
+ unregisterHandler(tag);
+ logger.warn(e);
+ }
+ }
}
Deleted: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractSocketChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractSocketChannel.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractSocketChannel.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -1,137 +0,0 @@
-//----------------------------------------------------------------------
-//This code is developed as part of the Java CoG Kit project
-//The terms of the license can be found at http://www.cogkit.org/license
-//This message may not be removed or altered.
-//----------------------------------------------------------------------
-
-/*
- * Created on Jul 21, 2006
- */
-package org.globus.cog.karajan.workflow.service.channels;
-
-import java.io.EOFException;
-import java.net.Socket;
-
-import org.globus.cog.karajan.workflow.service.RequestManager;
-
-public abstract class AbstractSocketChannel extends AbstractStreamKarajanChannel implements Runnable {
- private Socket socket;
- private boolean started;
- private Exception startException;
- private final boolean client;
- private boolean closing;
-
- public AbstractSocketChannel(RequestManager requestManager, ChannelContext channelContext,
- Socket socket, boolean client) {
- super(requestManager, channelContext);
- this.socket = socket;
- this.client = client;
- if (client) {
- setEndpoint("C(" + socket.getLocalAddress() + ")");
- }
- else {
- setEndpoint("S(" + socket.getLocalAddress() + ")");
- }
- }
-
- public synchronized void start() throws Exception {
- Thread thread = new Thread(this);
- thread.setDaemon(true);
- thread.setName("Chanel: " + getEndpoint());
- thread.start();
- while (!isStarted() && !isClosed() && startException == null) {
- try {
- wait();
- }
- catch (InterruptedException e) {
- }
- }
- if (startException != null) {
- logger.debug("Exception while starting channel", startException);
- throw startException;
- }
- logger.info(getEndpoint() + "Channel started");
- }
-
- public void run() {
- ChannelContext context = getChannelContext();
- try {
- try {
- setInputStream(socket.getInputStream());
- setOutputStream(socket.getOutputStream());
- started = true;
- }
- catch (Exception e) {
- startException = e;
- e.printStackTrace();
- return;
- }
- finally {
- synchronized (this) {
- notifyAll();
- }
- }
- initializeConnection();
- mainLoop();
- }
- catch (EOFException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Channel terminated", e);
- }
- context.notifyRegisteredListeners(e);
- }
- catch (Exception e) {
- if (!closing) {
- logger.warn("Exception in channel", e);
- context.notifyRegisteredListeners(e);
- }
- }
- finally {
- try {
- setLocalShutdown();
- ChannelManager.getManager().shutdownChannel(this);
- }
- catch (ShuttingDownException e) {
- logger.debug("Channel already shutting down");
- }
- catch (Exception e) {
- logger.warn(getEndpoint() + "Could not shutdown channel", e);
- }
- super.close();
- synchronized (this) {
- notify();
- }
- logger.info(getEndpoint() + "Channel terminated");
- }
- }
-
- protected void initializeConnection() {
-
- }
-
- public void close() {
- closing = true;
- try {
- if (!socket.isClosed()) {
- socket.close();
- logger.info(getEndpoint() + "Channel shut down");
- }
- }
- catch (Exception e) {
- logger.warn(getEndpoint() + "Failed to close socket", e);
- }
- super.close();
- }
-
- public boolean isClient() {
- return client;
- }
-
- public boolean isStarted() {
- return started;
- }
-
- public boolean isOffline() {
- return isClosed();
- }
-}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -12,31 +12,39 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
import org.apache.log4j.Logger;
-import org.globus.cog.karajan.workflow.service.NoSuchHandlerException;
-import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.RequestManager;
-import org.globus.cog.karajan.workflow.service.commands.Command;
-import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
public abstract class AbstractStreamKarajanChannel extends AbstractKarajanChannel {
- public static final Logger logger = Logger.getLogger(AbstractStreamKarajanChannel.class);
-
+ public static final Logger logger = Logger.getLogger(AbstractStreamKarajanChannel.class);
+
+ public static final int STATE_IDLE = 0;
+ public static final int STATE_RECEIVING_DATA = 1;
+
+ public static final int HEADER_LEN = 12;
+
private InputStream inputStream;
private OutputStream outputStream;
- private String endpoint;
- private final ByteBuffer header;
- private final byte[] bheader;
-
- protected AbstractStreamKarajanChannel(RequestManager requestManager, ChannelContext channelContext) {
+ private URI contact;
+ private final byte[] rhdr;
+ private byte[] data;
+ private int dataPointer;
+ private int state, tag, flags, len;
+
+ protected AbstractStreamKarajanChannel(RequestManager requestManager,
+ ChannelContext channelContext) {
super(requestManager, channelContext);
- bheader = new byte[12];
- header = ByteBuffer.wrap(bheader);
+ rhdr = new byte[HEADER_LEN];
}
-
+
protected InputStream getInputStream() {
return inputStream;
}
@@ -53,146 +61,227 @@
this.outputStream = outputStream;
}
- public String getEndpoint() {
- return endpoint;
+ public URI getContact() {
+ return contact;
}
- public void setEndpoint(String endpoint) {
- this.endpoint = endpoint;
+ public void setContact(URI contact) {
+ this.contact = contact;
}
public synchronized void sendTaggedData(int tag, int flags, byte[] data) {
- header.clear();
- header.putInt(tag);
- header.putInt(flags);
- header.putInt(data.length);
- try {
- outputStream.write(bheader);
- outputStream.write(data);
- outputStream.flush();
+ getSender().enqueue(tag, flags, data, this);
+ }
+
+ protected boolean step() throws IOException {
+ int avail = inputStream.available();
+ if (avail == 0) {
+ return false;
}
- catch (IOException e) {
- throw new ChannelIOException(e);
+ if (state == STATE_IDLE && avail >= HEADER_LEN) {
+ readFromStream(inputStream, rhdr, 0);
+ tag = unpack(rhdr, 0);
+ flags = unpack(rhdr, 4);
+ len = unpack(rhdr, 8);
+ if (len > 20000) {
+ System.out.println("Big len: " + len);
+ }
+ data = new byte[len];
+ dataPointer = 0;
+ state = STATE_RECEIVING_DATA;
}
- }
-
-
- protected void mainLoop() throws IOException {
- ChannelContext context = getChannelContext();
- ByteBuffer header = ByteBuffer.allocate(12);
- while (!isClosed()) {
- header.clear();
- while (header.remaining() > 0) {
- readFromStream(inputStream, header);
- }
- header.rewind();
- IntBuffer iheader = header.asIntBuffer();
- int tag = iheader.get();
- int flags = iheader.get();
- int len = iheader.get();
- byte[] data = new byte[len];
- int crt = 0;
- while (crt < len) {
- crt = readFromStream(inputStream, data, crt);
- }
+ if (state == STATE_RECEIVING_DATA) {
+ while (avail > 0 && dataPointer < len) {
+ dataPointer += inputStream.read(data, dataPointer, Math.min(avail, len
+ - dataPointer));
+ avail = inputStream.available();
+ }
+ if (dataPointer == len) {
+ state = STATE_IDLE;
boolean fin = (flags & FINAL_FLAG) != 0;
boolean error = (flags & ERROR_FLAG) != 0;
if ((flags & REPLY_FLAG) != 0) {
// reply
- if (logger.isDebugEnabled()) {
- logger.debug(this + "REPL<: tag = " + tag + ", fin = " + fin + ", err = "
- + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
- }
- Command cmd = context.getRegisteredCommand(tag);
-
- if (cmd != null) {
- cmd.replyReceived(data);
- if (fin) {
- if (error) {
- cmd.errorReceived();
- }
- else {
- cmd.receiveCompleted();
- }
- unregisterCommand(cmd);
- }
- }
- else {
- logger.warn(endpoint + "Recieved reply to unregistered sender. Tag: " + tag);
- }
+ handleReply(tag, fin, error, len, data);
}
else {
// request
- if (logger.isDebugEnabled()) {
- logger.debug(this + "REQ<: tag = " + tag + ", fin = " + fin + ", err = "
- + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
+ handleRequest(tag, fin, error, len, data);
+ }
+ }
+ }
+ return true;
+ }
+
+ protected void register() {
+ getMultiplexer(FAST).register(this);
+ }
+
+ private static final int SENDER_COUNT = 1;
+ private static Sender[] sender;
+ private static int crtSender;
+
+ private static synchronized Sender getSender() {
+ if (sender == null) {
+ sender = new Sender[SENDER_COUNT];
+ for (int i = 0; i < SENDER_COUNT; i++) {
+ sender[i] = new Sender();
+ sender[i].start();
+ }
+ }
+ try {
+ return sender[crtSender++];
+ }
+ finally {
+ if (crtSender == SENDER_COUNT) {
+ crtSender = 0;
+ }
+ }
+ }
+
+ private static class SendEntry {
+ public final int tag, flags;
+ public final byte[] data;
+ public final AbstractStreamKarajanChannel channel;
+
+ public SendEntry(int tag, int flags, byte[] data, AbstractStreamKarajanChannel channel) {
+ this.tag = tag;
+ this.flags = flags;
+ this.data = data;
+ this.channel = channel;
+ }
+ }
+
+ private static class Sender extends Thread {
+ private final LinkedList queue;
+ private final byte[] shdr;
+
+ public Sender() {
+ super("Sender");
+ queue = new LinkedList();
+ setDaemon(true);
+ shdr = new byte[HEADER_LEN];
+ }
+
+ public synchronized void enqueue(int tag, int flags, byte[] data, AbstractStreamKarajanChannel channel) {
+ queue.addLast(new SendEntry(tag, flags, data, channel));
+ notify();
+ }
+
+ public void run() {
+ try {
+ SendEntry e;
+ while (true) {
+ synchronized (this) {
+ while (queue.isEmpty()) {
+ wait();
+ }
+ e = (SendEntry) queue.removeFirst();
}
- RequestHandler handler = context.getRegisteredHandler(tag);
try {
- if (handler != null) {
- handler.register(this);
- handler.dataReceived(data);
- if (fin) {
- try {
- handler.receiveCompleted();
- }
- catch (ChannelIOException e) {
- throw e;
- }
- catch (Exception e) {
- if (!handler.isReplySent()) {
- handler.sendError(e.toString(), e);
- }
- }
- catch (Error e) {
- if (!handler.isReplySent()) {
- handler.sendError(e.toString(), e);
- }
- throw e;
- }
- finally {
- unregisterHandler(tag);
- }
- }
+ send(e.tag, e.flags, e.data, e.channel.getOutputStream());
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ try {
+ e.channel.getChannelContext().getRegisteredCommand(e.tag).errorReceived(null, ex);
+ }
+ catch (Exception exx) {
+ logger.warn(exx);
+ }
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // exit
+ }
+ }
+
+ private void send(int tag, int flags, byte[] data, OutputStream os) throws IOException {
+ pack(shdr, 0, tag);
+ pack(shdr, 4, flags);
+ pack(shdr, 8, data.length);
+ synchronized(os) {
+ os.write(shdr);
+ os.write(data);
+ os.flush();
+ }
+ }
+ }
+
+ private static final int MUX_COUNT = 2;
+ private static Multiplexer[] multiplexer;
+ public static final int FAST = 0;
+ public static final int SLOW = 1;
+
+...
[truncated message content] |
|
From: <ha...@us...> - 2008-04-22 13:57:30
|
Revision: 1965
http://cogkit.svn.sourceforge.net/cogkit/?rev=1965&view=rev
Author: hategan
Date: 2008-04-22 06:57:26 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
added take() method
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/Queue.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/Queue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/Queue.java 2008-04-22 03:22:39 UTC (rev 1964)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/Queue.java 2008-04-22 13:57:26 UTC (rev 1965)
@@ -22,14 +22,23 @@
head.prev.next = e;
head.prev = e;
size++;
+ notifyAll();
}
public synchronized Object dequeue() {
Object o = head.next.obj;
head.next.next.prev = head;
head.next = head.next.next;
+ size--;
return o;
}
+
+ public synchronized Object take() throws InterruptedException {
+ while (size == 0) {
+ wait();
+ }
+ return dequeue();
+ }
public boolean isEmpty() {
return size == 0;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-22 03:22:42
|
Revision: 1964
http://cogkit.svn.sourceforge.net/cogkit/?rev=1964&view=rev
Author: hategan
Date: 2008-04-21 20:22:39 -0700 (Mon, 21 Apr 2008)
Log Message:
-----------
properly handle job managers
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/execution/gt2/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/execution/gt2/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/execution/gt2/JobSubmissionTaskHandler.java 2008-04-18 03:30:35 UTC (rev 1963)
+++ trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/execution/gt2/JobSubmissionTaskHandler.java 2008-04-22 03:22:39 UTC (rev 1964)
@@ -20,9 +20,11 @@
import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
import org.globus.cog.abstraction.interfaces.DelegatedTaskHandler;
import org.globus.cog.abstraction.interfaces.Delegation;
+import org.globus.cog.abstraction.interfaces.ExecutionService;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.SecurityContext;
+import org.globus.cog.abstraction.interfaces.Service;
import org.globus.cog.abstraction.interfaces.ServiceContact;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
@@ -126,8 +128,11 @@
String server = serviceContact.getContact();
// if the jobmanager attribute is specified, handle it
- String jobmanager = (String) this.task.getService(0).getAttribute(
- "jobmanager");
+ Service service = this.task.getService(0);
+ String jobmanager = null;
+ if (service instanceof ExecutionService) {
+ jobmanager = ((ExecutionService) service).getJobManager();
+ }
if (jobmanager != null) {
server = handleJobManager(server, jobmanager);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-18 03:30:36
|
Revision: 1963
http://cogkit.svn.sourceforge.net/cogkit/?rev=1963&view=rev
Author: hategan
Date: 2008-04-17 20:30:35 -0700 (Thu, 17 Apr 2008)
Log Message:
-----------
testing my svn client
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/project.properties
Modified: trunk/current/src/cog/modules/karajan/project.properties
===================================================================
--- trunk/current/src/cog/modules/karajan/project.properties 2008-04-08 11:20:44 UTC (rev 1962)
+++ trunk/current/src/cog/modules/karajan/project.properties 2008-04-18 03:30:35 UTC (rev 1963)
@@ -1,7 +1,7 @@
module.name = karajan
-long.name = Karajan Workflow Engine
-version = 0.36-dev
-project = Java CoG Kit
-lib.deps = xpp3*.jar, xstream*.jar, backport-util-concurrent.jar
+long.name = Karajan Workflow Engine
+version = 0.36-dev
+project = Java CoG Kit
+lib.deps = xpp3*.jar, xstream*.jar, backport-util-concurrent.jar
exclude.dirs = com/thoughtworks/xstream/**/*.*
-debug = true
\ No newline at end of file
+debug = true
\ No newline at end of file
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-08 11:20:48
|
Revision: 1962
http://cogkit.svn.sourceforge.net/cogkit/?rev=1962&view=rev
Author: hategan
Date: 2008-04-08 04:20:44 -0700 (Tue, 08 Apr 2008)
Log Message:
-----------
fixed issues introduced by previous commit
Modified Paths:
--------------
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceContactImpl.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceImpl.java
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceContactImpl.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceContactImpl.java 2008-04-07 08:02:56 UTC (rev 1961)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceContactImpl.java 2008-04-08 11:20:44 UTC (rev 1962)
@@ -16,25 +16,29 @@
public static final ServiceContact LOCALHOST = new ServiceContactImpl(
"localhost");
- private String host, path;
+ private String host, path, contact;
private int port;
public ServiceContactImpl() {
}
public ServiceContactImpl(String contact) {
+ this.contact = contact;
parse(contact);
}
public ServiceContactImpl(String host, int port) {
this.host = host;
this.port = port;
+ buildContact();
}
public void setHost(String host) {
this.host = host;
port = -1;
+ buildContact();
}
+
public String getHost() {
return host;
@@ -42,6 +46,7 @@
public void setPort(int port) {
this.port = port;
+ buildContact();
}
public int getPort() {
@@ -49,11 +54,12 @@
}
public void setContact(String contact) {
+ this.contact = contact;
parse(contact);
}
public String getContact() {
- return host + (port == -1 ? "" : ":" + port) + (path == null ? "" : path);
+ return contact;
}
public boolean equals(Object o) {
@@ -69,10 +75,17 @@
}
private void parse(String contact) {
- int portsep = contact.indexOf(':');
- int pathsep = contact.indexOf('/');
+ int schemesep = contact.indexOf("://");
+ if (schemesep == -1) {
+ schemesep = 0;
+ }
+ else {
+ schemesep += 3;
+ }
+ int portsep = contact.indexOf(':', schemesep);
+ int pathsep = contact.indexOf('/', schemesep);
if (portsep != -1 && (pathsep == -1 || portsep < pathsep)) {
- host = contact.substring(0, portsep);
+ host = contact.substring(schemesep, portsep);
if (pathsep == -1) {
port = Integer.parseInt(contact.substring(portsep + 1));
path = null;
@@ -83,16 +96,20 @@
}
}
else if (pathsep != -1) {
- host = contact.substring(0, pathsep);
+ host = contact.substring(schemesep, pathsep);
port = -1;
path = contact.substring(pathsep);
}
else {
- host = contact;
+ host = contact.substring(schemesep);
port = -1;
path = null;
}
}
+
+ private void buildContact() {
+ this.contact = host + (port == -1 ? "" : (":" + port));
+ }
public String toString() {
return getContact();
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceImpl.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceImpl.java 2008-04-07 08:02:56 UTC (rev 1961)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceImpl.java 2008-04-08 11:20:44 UTC (rev 1962)
@@ -21,7 +21,7 @@
public class ServiceImpl implements Service {
private Identity identity = null;
private String name = "";
- private ServiceContact serviceContact = null;
+ private ServiceContact serviceContact;
private SecurityContext securityContext = null;
private Map attributes;
private String provider = null;
@@ -88,7 +88,7 @@
}
public ServiceContact getServiceContact() {
- return this.serviceContact;
+ return serviceContact;
}
public void setSecurityContext(SecurityContext securityContext) {
@@ -153,7 +153,7 @@
}
public String toString() {
- return this.serviceContact.toString() + "(" + this.provider + ")";
+ return serviceContact.toString() + "(" + this.provider + ")";
}
public int hashCode() {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-07 08:03:00
|
Revision: 1961
http://cogkit.svn.sourceforge.net/cogkit/?rev=1961&view=rev
Author: hategan
Date: 2008-04-07 01:02:56 -0700 (Mon, 07 Apr 2008)
Log Message:
-----------
a security context is not the place to throw such an exception
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-webdav/src/org/globus/cog/abstraction/impl/file/webdav/WebDAVSecurityContextImpl.java
Modified: trunk/current/src/cog/modules/provider-webdav/src/org/globus/cog/abstraction/impl/file/webdav/WebDAVSecurityContextImpl.java
===================================================================
--- trunk/current/src/cog/modules/provider-webdav/src/org/globus/cog/abstraction/impl/file/webdav/WebDAVSecurityContextImpl.java 2008-04-07 08:02:39 UTC (rev 1960)
+++ trunk/current/src/cog/modules/provider-webdav/src/org/globus/cog/abstraction/impl/file/webdav/WebDAVSecurityContextImpl.java 2008-04-07 08:02:56 UTC (rev 1961)
@@ -10,7 +10,6 @@
import java.util.Hashtable;
import org.apache.log4j.Logger;
-import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
import org.globus.cog.abstraction.interfaces.SecurityContext;
public class WebDAVSecurityContextImpl implements SecurityContext {
@@ -41,11 +40,7 @@
}
}
- public Object getCredentials() throws InvalidSecurityContextException {
- if (credentials == null) {
- throw new InvalidSecurityContextException(
- "WebDAV provider cannot handle default credentials. Please provide a valid WebDAV credential");
- }
+ public Object getCredentials() {
return this.credentials;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-07 08:02:41
|
Revision: 1960
http://cogkit.svn.sourceforge.net/cogkit/?rev=1960&view=rev
Author: hategan
Date: 2008-04-07 01:02:39 -0700 (Mon, 07 Apr 2008)
Log Message:
-----------
removed exception from SecurityContext.getCredential()
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHSecurityContextImpl.java
Modified: trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHSecurityContextImpl.java
===================================================================
--- trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHSecurityContextImpl.java 2008-04-07 08:01:52 UTC (rev 1959)
+++ trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHSecurityContextImpl.java 2008-04-07 08:02:39 UTC (rev 1960)
@@ -9,7 +9,6 @@
import java.util.Hashtable;
import org.apache.log4j.Logger;
-import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
import org.globus.cog.abstraction.interfaces.SecurityContext;
public class SSHSecurityContextImpl implements SecurityContext {
@@ -36,7 +35,7 @@
this.credentials = credentials;
}
- public Object getCredentials() throws InvalidSecurityContextException {
+ public Object getCredentials() {
return this.credentials;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-07 08:01:56
|
Revision: 1959
http://cogkit.svn.sourceforge.net/cogkit/?rev=1959&view=rev
Author: hategan
Date: 2008-04-07 01:01:52 -0700 (Mon, 07 Apr 2008)
Log Message:
-----------
a security context is not the place to throw such an exception
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/ftp/FTPSecurityContextImpl.java
Modified: trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/ftp/FTPSecurityContextImpl.java
===================================================================
--- trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/ftp/FTPSecurityContextImpl.java 2008-04-07 08:01:05 UTC (rev 1958)
+++ trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/ftp/FTPSecurityContextImpl.java 2008-04-07 08:01:52 UTC (rev 1959)
@@ -10,7 +10,6 @@
import java.util.Hashtable;
import org.apache.log4j.Logger;
-import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
import org.globus.cog.abstraction.interfaces.SecurityContext;
public class FTPSecurityContextImpl implements SecurityContext {
@@ -40,11 +39,7 @@
}
}
- public Object getCredentials() throws InvalidSecurityContextException {
- if (credentials == null) {
- throw new InvalidSecurityContextException(
- "FTP provider cannot handle default credentials. Please provide a valid FTP credential");
- }
+ public Object getCredentials() {
return this.credentials;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-07 08:01:13
|
Revision: 1958
http://cogkit.svn.sourceforge.net/cogkit/?rev=1958&view=rev
Author: hategan
Date: 2008-04-07 01:01:05 -0700 (Mon, 07 Apr 2008)
Log Message:
-----------
changes
Modified Paths:
--------------
trunk/current/src/cog/modules/abstraction-common/CHANGES.txt
Modified: trunk/current/src/cog/modules/abstraction-common/CHANGES.txt
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/CHANGES.txt 2008-04-07 07:54:31 UTC (rev 1957)
+++ trunk/current/src/cog/modules/abstraction-common/CHANGES.txt 2008-04-07 08:01:05 UTC (rev 1958)
@@ -1,3 +1,8 @@
+(04/07/2008)
+
+*** SecurityContextImpl, ServiceImpl, and ServiceContactImpl now
+ properly implement equals() and hashCode()
+
(03/26/2008)
*** Made a few objects serializable.
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-07 07:54:33
|
Revision: 1957
http://cogkit.svn.sourceforge.net/cogkit/?rev=1957&view=rev
Author: hategan
Date: 2008-04-07 00:54:31 -0700 (Mon, 07 Apr 2008)
Log Message:
-----------
properly implement equals and hashCode
Modified Paths:
--------------
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/SecurityContextImpl.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceContactImpl.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceImpl.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/SecurityContext.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/ServiceContact.java
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/SecurityContextImpl.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/SecurityContextImpl.java 2008-04-05 13:59:25 UTC (rev 1956)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/SecurityContextImpl.java 2008-04-07 07:54:31 UTC (rev 1957)
@@ -38,7 +38,6 @@
}
public String getAlias() {
-
return this.alias;
}
@@ -54,4 +53,29 @@
public Object getAttribute(String name) {
return this.attributes.get(name);
}
+
+ public int hashCode() {
+ return (credentials == null ? 0 : credentials.hashCode()) + attributes.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof SecurityContext) {
+ SecurityContext sc = (SecurityContext) o;
+ if ((credentials == null && sc.getCredentials() == null) || credentials.equals(sc.getCredentials())) {
+ if (o instanceof SecurityContextImpl) {
+ SecurityContextImpl sci = (SecurityContextImpl) o;
+ return attributes.equals(sci.attributes);
+ }
+ else {
+ return true;
+ }
+ }
+ else {
+ return false;
+ }
+ }
+ else {
+ return false;
+ }
+ }
}
\ No newline at end of file
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceContactImpl.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceContactImpl.java 2008-04-05 13:59:25 UTC (rev 1956)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceContactImpl.java 2008-04-07 07:54:31 UTC (rev 1957)
@@ -6,10 +6,6 @@
package org.globus.cog.abstraction.impl.common.task;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.StringTokenizer;
-
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.interfaces.ServiceContact;
@@ -17,26 +13,17 @@
static Logger logger = Logger.getLogger(ServiceContactImpl.class.getName());
- private static final byte HOST = 1;
-
- private static final byte PORT = 2;
-
- private static final byte CONTACT = 3;
-
public static final ServiceContact LOCALHOST = new ServiceContactImpl(
"localhost");
- private String host = null;
+ private String host, path;
+ private int port;
- private int port = -1;
-
- private String contact = null;
-
public ServiceContactImpl() {
}
public ServiceContactImpl(String contact) {
- this.contact = contact;
+ parse(contact);
}
public ServiceContactImpl(String host, int port) {
@@ -46,10 +33,11 @@
public void setHost(String host) {
this.host = host;
+ port = -1;
}
public String getHost() {
- return get(HOST);
+ return host;
}
public void setPort(int port) {
@@ -57,117 +45,53 @@
}
public int getPort() {
- String p = get(PORT);
- if (p == null) {
- return -1;
- }
- return Integer.parseInt(p);
-
+ return port;
}
public void setContact(String contact) {
- this.contact = contact;
+ parse(contact);
}
public String getContact() {
- return get(CONTACT);
+ return host + (port == -1 ? "" : ":" + port) + (path == null ? "" : path);
}
- public boolean equals(ServiceContact serviceContact) {
- return this.getContact().equalsIgnoreCase(serviceContact.getContact());
- }
-
- public boolean equals(Object object) {
- if (!(object instanceof ServiceContact)) {
- return false;
+ public boolean equals(Object o) {
+ if (o instanceof ServiceContact) {
+ ServiceContact sc = (ServiceContact) o;
+ return getContact().equals(sc.getContact());
}
- return this.toString().equalsIgnoreCase(
- ((ServiceContact) object).toString());
+ return false;
}
public int hashCode() {
- return this.getContact().toLowerCase().hashCode();
+ return this.getContact().hashCode();
}
- private String get(byte element) {
- switch (element) {
- case CONTACT:
- // if the service contact url is already set
- if (this.contact != null) {
- return this.contact;
+ private void parse(String contact) {
+ int portsep = contact.indexOf(':');
+ int pathsep = contact.indexOf('/');
+ if (portsep != -1 && (pathsep == -1 || portsep < pathsep)) {
+ host = contact.substring(0, portsep);
+ if (pathsep == -1) {
+ port = Integer.parseInt(contact.substring(portsep + 1));
+ path = null;
}
- // if not try to generate one
- else if (this.host != null) {
- if (this.port != -1) {
- return this.host + ":" + Integer.toString(this.port);
- } else {
- return this.host;
- }
+ else {
+ port = Integer.parseInt(contact.substring(portsep + 1, pathsep));
+ path = contact.substring(pathsep);
}
- return null;
- case HOST:
- if (this.host == null && this.contact != null) {
- // try to cast the contact into a URI and then get the
- // host and port from it
- if (this.contact.indexOf("://") != -1) {
- try {
- URI uri = new URI(this.contact);
- logger.debug("Host from URI: " + uri.getHost());
- return uri.getHost();
- } catch (URISyntaxException e) {
- logger
- .debug("Cannot retreive host information from the URI");
- return null;
- }
- } else {
- String c = this.contact;
- StringTokenizer st = new StringTokenizer(c, ":");
- try {
- String h = st.nextToken();
- logger.debug("Host from contact: " + h);
- return h;
- } catch (Exception ex) {
- logger
- .debug("Cannot retreive port information from the contact");
- return null;
- }
- }
- }
- return this.host;
- case PORT:
- if (this.port == -1 && this.contact != null) {
- // try to cast the contact into a URI and then get the
- // port from it
- if (this.contact.indexOf("://") != -1) {
- try {
- URI uri = new URI(this.contact);
- logger.debug("Port from URI: " + uri.getPort());
- return Integer.toString(uri.getPort());
- } catch (URISyntaxException e) {
- logger
- .debug("Cannot retreive port information from the URI");
- return null;
- }
- } else {
- String c = this.contact;
- StringTokenizer st = new StringTokenizer(c, ":");
- try {
- st.nextToken();
- String p = st.nextToken();
- logger.debug("Port from contact: " + p);
- return p;
- } catch (Exception ex) {
- logger
- .debug("Cannot retreive port information from the contact");
- return null;
- }
- }
- }
- return Integer.toString(this.port);
- default:
- break;
}
- return null;
+ else if (pathsep != -1) {
+ host = contact.substring(0, pathsep);
+ port = -1;
+ path = contact.substring(pathsep);
+ }
+ else {
+ host = contact;
+ port = -1;
+ path = null;
+ }
}
public String toString() {
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceImpl.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceImpl.java 2008-04-05 13:59:25 UTC (rev 1956)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/task/ServiceImpl.java 2008-04-07 07:54:31 UTC (rev 1957)
@@ -142,7 +142,7 @@
public Enumeration getAllAttributes() {
return new Vector(getAttributeNames()).elements();
}
-
+
public Collection getAttributeNames() {
if (attributes != null) {
return attributes.keySet();
@@ -155,4 +155,23 @@
public String toString() {
return this.serviceContact.toString() + "(" + this.provider + ")";
}
+
+ public int hashCode() {
+ return serviceContact.hashCode() + provider.hashCode()
+ + (securityContext == null ? 0 : securityContext.hashCode());
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof Service) {
+ Service s = (Service) o;
+ return serviceContact.equals(s.getServiceContact())
+ && provider.equals(s.getProvider())
+ && equals(securityContext, s.getSecurityContext());
+ }
+ return false;
+ }
+
+ private boolean equals(Object o1, Object o2) {
+ return o1 == null ? o2 == null : o1.equals(o2);
+ }
}
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/SecurityContext.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/SecurityContext.java 2008-04-05 13:59:25 UTC (rev 1956)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/SecurityContext.java 2008-04-07 07:54:31 UTC (rev 1957)
@@ -6,7 +6,6 @@
package org.globus.cog.abstraction.interfaces;
-import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
import org.ietf.jgss.GSSCredential;
/**
@@ -24,7 +23,7 @@
/**
* Returns the credentials for this <code>SecurityContext</code>
*/
- public Object getCredentials() throws InvalidSecurityContextException;
+ public Object getCredentials();
public void setAlias(String alias);
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/ServiceContact.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/ServiceContact.java 2008-04-05 13:59:25 UTC (rev 1956)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/ServiceContact.java 2008-04-07 07:54:31 UTC (rev 1957)
@@ -42,10 +42,4 @@
* Returns the entire contact string of this <code>ServiceContact</code>
*/
public String getContact();
-
- /**
- * Checks if the given <code>ServiceContact</code> is equal to this
- * <code>ServiceContact</code>.
- */
- public boolean equals(ServiceContact serviceContact);
}
\ No newline at end of file
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-05 13:59:29
|
Revision: 1956
http://cogkit.svn.sourceforge.net/cogkit/?rev=1956&view=rev
Author: hategan
Date: 2008-04-05 06:59:25 -0700 (Sat, 05 Apr 2008)
Log Message:
-----------
fixed data channel reuse stuff
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-gt2/CHANGES.txt
trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/gridftp/FileResourceImpl.java
trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/gridftp/old/FileResourceImpl.java
Modified: trunk/current/src/cog/modules/provider-gt2/CHANGES.txt
===================================================================
--- trunk/current/src/cog/modules/provider-gt2/CHANGES.txt 2008-04-04 12:29:00 UTC (rev 1955)
+++ trunk/current/src/cog/modules/provider-gt2/CHANGES.txt 2008-04-05 13:59:25 UTC (rev 1956)
@@ -1,3 +1,7 @@
+(04/05/2008)
+
+*** Fixed data channel reuse behavior
+
(02/11/2008)
*** The last commit broke things (mlst seems picky).
Modified: trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/gridftp/FileResourceImpl.java
===================================================================
--- trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/gridftp/FileResourceImpl.java 2008-04-04 12:29:00 UTC (rev 1955)
+++ trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/gridftp/FileResourceImpl.java 2008-04-05 13:59:25 UTC (rev 1956)
@@ -69,7 +69,7 @@
}
else {
try {
- initializeDataChannel();
+ initializeDataChannel(RETRIEVE);
Vector v = this.getGridFTPClient().mlsd();
ArrayList list = new ArrayList();
Iterator i = v.iterator();
@@ -126,7 +126,7 @@
}
else {
try {
- initializeDataChannel();
+ initializeDataChannel(RETRIEVE);
Vector v = this.getGridFTPClient().mlsd(directory);
ArrayList list = new ArrayList();
Iterator i = v.iterator();
Modified: trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/gridftp/old/FileResourceImpl.java
===================================================================
--- trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/gridftp/old/FileResourceImpl.java 2008-04-04 12:29:00 UTC (rev 1955)
+++ trunk/current/src/cog/modules/provider-gt2/src/org/globus/cog/abstraction/impl/file/gridftp/old/FileResourceImpl.java 2008-04-05 13:59:25 UTC (rev 1956)
@@ -61,6 +61,9 @@
public class FileResourceImpl extends AbstractFTPFileResource {
public static final Logger logger = Logger
.getLogger(FileResourceImpl.class);
+
+ protected static final boolean STORE = true;
+ protected static final boolean RETRIEVE = false;
/**
* By default JGlobus sets this to 6000 ms. Experience has proved that it
@@ -71,6 +74,7 @@
private GridFTPClient gridFTPClient;
private boolean dataChannelReuse;
private boolean dataChannelInitialized;
+ private boolean dataChannelDirection;
/** throws InvalidProviderException */
public FileResourceImpl() throws Exception {
@@ -102,10 +106,18 @@
gridFTPClient = new GridFTPClient(host, port);
Reply r = gridFTPClient.getLastReply();
- if (r != null && r.getMessage().indexOf("GridFTP Server 2.3") != -1) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Initial reply: " + r.getMessage());
+ }
+ if (r != null && r.getMessage().indexOf("Virtual Broken GridFTP Server") != -1) {
+ dataChannelReuse = false;
+ }
+ else {
dataChannelReuse = true;
- logger.debug("GridFTP version is 2.3. Enabling data channel reuse.");
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Data channel reuse: " + dataChannelReuse);
+ }
gridFTPClient.setClientWaitParams(MAX_REPLY_WAIT_TIME,
Session.DEFAULT_WAIT_DELAY);
GSSCredential proxy = (GSSCredential) getSecurityContext()
@@ -124,14 +136,28 @@
"Error communicating with the GridFTP server", e);
}
}
+
+ public boolean getDataChannelReuse() {
+ return dataChannelReuse;
+ }
+
+ public void setDataChannelReuse(boolean dataChannelReuse) {
+ this.dataChannelReuse = dataChannelReuse;
+ this.dataChannelInitialized = false;
+ }
- protected void initializeDataChannel() throws ClientException,
+ protected void initializeDataChannel(boolean mode) throws ClientException,
ServerException, IOException {
- if (!dataChannelInitialized || !dataChannelReuse) {
- gridFTPClient.setPassiveMode(true);
+ if (!dataChannelInitialized || !dataChannelReuse || dataChannelDirection != mode) {
+ gridFTPClient.setPassiveMode(mode);
dataChannelInitialized = true;
+ dataChannelDirection = mode;
}
}
+
+ protected void resetDataChannel() {
+ dataChannelInitialized = false;
+ }
protected void setSecurityOptions(GridFTPClient client)
throws ServerException, IOException {
@@ -214,7 +240,7 @@
Vector gridFileList = new Vector();
try {
- this.initializeDataChannel();
+ this.initializeDataChannel(RETRIEVE);
Enumeration list = gridFTPClient.list().elements();
while (list.hasMoreElements()) {
gridFileList.add(createGridFile((FileInfo) list.nextElement()));
@@ -309,7 +335,7 @@
public void get(String remoteFileName, DataSink sink,
MarkerListener mListener) throws FileResourceException {
try {
- initializeDataChannel();
+ initializeDataChannel(RETRIEVE);
gridFTPClient.get(remoteFileName, sink, mListener);
}
catch (Exception e) {
@@ -321,7 +347,7 @@
public void get(String remoteFileName, File localFile)
throws FileResourceException {
try {
- initializeDataChannel();
+ initializeDataChannel(RETRIEVE);
gridFTPClient.get(remoteFileName, localFile);
}
catch (Exception e) {
@@ -341,7 +367,7 @@
final ProgressMonitor progressMonitor) throws FileResourceException {
File localFile = new File(localFileName);
try {
- initializeDataChannel();
+ initializeDataChannel(RETRIEVE);
final long size = localFile.length();
DataSink sink;
if (progressMonitor != null) {
@@ -374,7 +400,7 @@
final File localFile = new File(localFileName);
try {
- initializeDataChannel();
+ initializeDataChannel(STORE);
final long size = localFile.length();
DataSource source;
if (progressMonitor != null) {
@@ -408,7 +434,7 @@
throws FileResourceException {
try {
- initializeDataChannel();
+ initializeDataChannel(STORE);
gridFTPClient.put(localFile, remoteFileName, append);
}
catch (Exception e) {
@@ -424,7 +450,7 @@
public void put(DataSource source, String remoteFileName,
MarkerListener mListener) throws FileResourceException {
try {
- initializeDataChannel();
+ initializeDataChannel(STORE);
gridFTPClient.put(remoteFileName, source, mListener);
}
catch (Exception e) {
@@ -495,7 +521,7 @@
setCurrentDirectory(currentDirectory);
}
catch (Exception e) {
- // do nothihng
+ // do nothing
}
}
return isDir;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-04 12:29:05
|
Revision: 1955
http://cogkit.svn.sourceforge.net/cogkit/?rev=1955&view=rev
Author: hategan
Date: 2008-04-04 05:29:00 -0700 (Fri, 04 Apr 2008)
Log Message:
-----------
added a socket timeout of 2 minutes
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/Ssh.java
Modified: trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/Ssh.java
===================================================================
--- trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/Ssh.java 2008-04-03 11:07:10 UTC (rev 1954)
+++ trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/Ssh.java 2008-04-04 12:29:00 UTC (rev 1955)
@@ -50,7 +50,9 @@
import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
public class Ssh {
- static Logger logger = Logger.getLogger(Ssh.class.getName());
+ public static final Logger logger = Logger.getLogger(Ssh.class);
+
+ public static final int DEFAULT_SOCKET_TIMEOUT = 120 * 1000;
protected String host;
protected int port = 22;
protected String username;
@@ -199,6 +201,7 @@
logger.debug("Connecting....");
}
+ client.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT);
client.connect(properties, new AbstractHostKeyVerification() {
public void onUnknownHost(String hostname,
String fingerprint) throws InvalidHostFileException {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-03 11:07:14
|
Revision: 1954
http://cogkit.svn.sourceforge.net/cogkit/?rev=1954&view=rev
Author: hategan
Date: 2008-04-03 04:07:10 -0700 (Thu, 03 Apr 2008)
Log Message:
-----------
moved connection shutdown outside of the synchronized block
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHChannelManager.java
trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHConnectionBundle.java
Modified: trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHChannelManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHChannelManager.java 2008-03-26 14:47:59 UTC (rev 1953)
+++ trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHChannelManager.java 2008-04-03 11:07:10 UTC (rev 1954)
@@ -179,9 +179,8 @@
public void run() {
try {
- List shutdown = new ArrayList();
+ List shutdownList = new ArrayList();
while (true) {
- shutdown.clear();
Thread.sleep(REAP_INTERVAL);
synchronized (bundles) {
Iterator i = bundles.entrySet().iterator();
@@ -189,11 +188,21 @@
Map.Entry e = (Entry) i.next();
ConnectionID ix = (ConnectionID) e.getKey();
SSHConnectionBundle bundle = (SSHConnectionBundle) e.getValue();
- if (!bundle.shutdownIdleConnections()) {
+ if (!bundle.shutdownIdleConnections(shutdownList)) {
i.remove();
}
}
}
+ Iterator i = shutdownList.iterator();
+ while (i.hasNext()) {
+ try {
+ ((Runnable) i.next()).run();
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down SSH connection", e);
+ }
+ }
+ shutdownList.clear();
}
}
catch (InterruptedException e) {
Modified: trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHConnectionBundle.java
===================================================================
--- trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHConnectionBundle.java 2008-03-26 14:47:59 UTC (rev 1953)
+++ trunk/current/src/cog/modules/provider-ssh/src/org/globus/cog/abstraction/impl/ssh/SSHConnectionBundle.java 2008-04-03 11:07:10 UTC (rev 1954)
@@ -157,18 +157,22 @@
}
}
- public boolean shutdownIdleConnections() {
+ public boolean shutdownIdleConnections(List tasks) {
long crt = System.currentTimeMillis();
boolean anyActive = false;
synchronized (connections) {
Iterator i = connections.iterator();
while (i.hasNext()) {
- Connection c = (Connection) i.next();
+ final Connection c = (Connection) i.next();
if (c.sessionCount == 0 && crt - c.idleTime > MAX_IDLE_TIME) {
- if (logger.isDebugEnabled()) {
- logger.debug("Shutting down idle connection for " + id);
- }
- c.ssh.disconnect();
+ tasks.add(new Runnable() {
+ public void run() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Shutting down idle connection for " + id);
+ }
+ c.ssh.disconnect();
+ }
+ });
}
else {
anyActive = true;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
Revision: 1953
http://cogkit.svn.sourceforge.net/cogkit/?rev=1953&view=rev
Author: hategan
Date: 2008-03-26 07:47:59 -0700 (Wed, 26 Mar 2008)
Log Message:
-----------
re-reverted previous commit on this topic
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-condor/src/org/globus/cog/abstraction/impl/execution/condor/DescriptionFileGenerator.java
Modified: trunk/current/src/cog/modules/provider-condor/src/org/globus/cog/abstraction/impl/execution/condor/DescriptionFileGenerator.java
===================================================================
--- trunk/current/src/cog/modules/provider-condor/src/org/globus/cog/abstraction/impl/execution/condor/DescriptionFileGenerator.java 2008-03-26 14:46:02 UTC (rev 1952)
+++ trunk/current/src/cog/modules/provider-condor/src/org/globus/cog/abstraction/impl/execution/condor/DescriptionFileGenerator.java 2008-03-26 14:47:59 UTC (rev 1953)
@@ -21,8 +21,7 @@
descriptionFile = new File(descriptionFileName);
return descriptionFile;
} else {
- descriptionFile = File.createTempFile(Long.toString(task
- .getIdentity().getValue()), ".desc");
+ descriptionFile = File.createTempFile(task.getIdentity().getValue(), ".desc");
constructDescriptionFile(descriptionFile, task);
return descriptionFile;
@@ -78,8 +77,7 @@
// set the default log (if not specified)
String log = (String) specification.getAttribute("log");
if (log == null) {
- File logFile = File.createTempFile(Long.toString(task.getIdentity()
- .getValue()), ".log");
+ File logFile = File.createTempFile(task.getIdentity().getValue(), ".log");
log = logFile.getAbsolutePath();
specification.setAttribute("log", log);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-03-26 14:46:36
|
Revision: 1952
http://cogkit.svn.sourceforge.net/cogkit/?rev=1952&view=rev
Author: hategan
Date: 2008-03-26 07:46:02 -0700 (Wed, 26 Mar 2008)
Log Message:
-----------
changes
Modified Paths:
--------------
trunk/current/src/cog/modules/abstraction-common/CHANGES.txt
Modified: trunk/current/src/cog/modules/abstraction-common/CHANGES.txt
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/CHANGES.txt 2008-03-26 14:43:13 UTC (rev 1951)
+++ trunk/current/src/cog/modules/abstraction-common/CHANGES.txt 2008-03-26 14:46:02 UTC (rev 1952)
@@ -1,3 +1,13 @@
+(03/26/2008)
+
+*** Made a few objects serializable.
+
+*** Removed heavy use of hashtables for things where fields
+ work much better.
+
+*** Changed implementation of Identity to be more conservative
+ towards memory usage (i.e. removed the silly URI)
+
(02/12/2008)
*** Finally fixed the notification order. A task is now
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-03-26 14:43:27
|
Revision: 1951
http://cogkit.svn.sourceforge.net/cogkit/?rev=1951&view=rev
Author: hategan
Date: 2008-03-26 07:43:13 -0700 (Wed, 26 Mar 2008)
Log Message:
-----------
updated code that relies on identity.getValue being a long
Modified Paths:
--------------
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/taskgraph/TaskGraphImpl.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/fileTransfer/DelegatedFileTransferHandler.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskGraphMarshaller.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskGraphUnmarshaller.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskMarshaller.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskUnmarshaller.java
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/taskgraph/TaskGraphImpl.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/taskgraph/TaskGraphImpl.java 2008-03-26 14:42:20 UTC (rev 1950)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/taskgraph/TaskGraphImpl.java 2008-03-26 14:43:13 UTC (rev 1951)
@@ -330,6 +330,6 @@
}
public int hashCode() {
- return (int) this.id.getValue();
+ return this.id.hashCode();
}
}
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/fileTransfer/DelegatedFileTransferHandler.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/fileTransfer/DelegatedFileTransferHandler.java 2008-03-26 14:42:20 UTC (rev 1950)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/fileTransfer/DelegatedFileTransferHandler.java 2008-03-26 14:43:13 UTC (rev 1951)
@@ -241,8 +241,7 @@
if (this.sourceResource != null) {
if (this.sourceResource.isDirectory(spec.getSource())) {
if (localDestination == null) {
- localDestination = File.createTempFile(Long
- .toString(this.task.getIdentity().getValue()),
+ localDestination = File.createTempFile(this.task.getIdentity().getValue(),
null);
localDestination.delete();
localDestination.mkdir();
@@ -264,8 +263,7 @@
}
else {
if (localDestination == null) {
- localDestination = File.createTempFile(Long
- .toString(this.task.getIdentity().getValue()),
+ localDestination = File.createTempFile(this.task.getIdentity().getValue(),
null);
}
if (logger.isDebugEnabled()) {
@@ -284,8 +282,8 @@
else {
if (localDestination == null) {
localDestination = File
- .createTempFile(Long.toString(this.task
- .getIdentity().getValue()), null);
+ .createTempFile(this.task
+ .getIdentity().getValue(), null);
}
transferWithHandler(service.getProvider(), service,
LOCAL_SERVICE, spec.getSource(), localDestination);
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskGraphMarshaller.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskGraphMarshaller.java 2008-03-26 14:42:20 UTC (rev 1950)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskGraphMarshaller.java 2008-03-26 14:43:13 UTC (rev 1951)
@@ -43,8 +43,7 @@
TaskGraph xmlTaskGraph)
throws MarshalException {
// set identity
- xmlTaskGraph.setIdentity(
- Long.toString(taskGraph.getIdentity().getValue()));
+ xmlTaskGraph.setIdentity(taskGraph.getIdentity().getValue());
//set name
String name = taskGraph.getName();
@@ -194,8 +193,8 @@
ExecutableObject from = pair.getFrom();
ExecutableObject to = pair.getTo();
Dependency xmlDependency = new Dependency();
- xmlDependency.setFrom(Long.toString(from.getIdentity().getValue()));
- xmlDependency.setTo(Long.toString(to.getIdentity().getValue()));
+ xmlDependency.setFrom(from.getIdentity().getValue());
+ xmlDependency.setTo(to.getIdentity().getValue());
xmlDependencyList.addDependency(xmlDependency);
}
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskGraphUnmarshaller.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskGraphUnmarshaller.java 2008-03-26 14:42:20 UTC (rev 1950)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskGraphUnmarshaller.java 2008-03-26 14:43:13 UTC (rev 1951)
@@ -45,7 +45,7 @@
String xmlIdentity = xmlTaskGraph.getIdentity();
if (xmlIdentity != null && xmlIdentity.length() > 0) {
Identity identity = new IdentityImpl();
- identity.setValue(Long.parseLong(xmlIdentity.trim()));
+ identity.setValue(xmlIdentity.trim());
taskGraph.setIdentity(identity);
}
@@ -144,10 +144,11 @@
Dependency xmlDependency = (Dependency) en.nextElement();
Identity from = new IdentityImpl();
- from.setValue(Long.parseLong(xmlDependency.getFrom().trim()));
+ //this obviously ignores the namespace
+ from.setValue(xmlDependency.getFrom().trim());
Identity to = new IdentityImpl();
- to.setValue(Long.parseLong(xmlDependency.getTo().trim()));
+ to.setValue(xmlDependency.getTo().trim());
dependency.add(taskGraph.get(from), taskGraph.get(to));
}
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskMarshaller.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskMarshaller.java 2008-03-26 14:42:20 UTC (rev 1950)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskMarshaller.java 2008-03-26 14:43:13 UTC (rev 1951)
@@ -40,7 +40,7 @@
org.globus.cog.abstraction.interfaces.Task task, Task xmlTask)
throws MarshalException {
// set the task identity
- xmlTask.setIdentity(Long.toString(task.getIdentity().getValue()));
+ xmlTask.setIdentity(task.getIdentity().getValue());
// set the task name
String name = task.getName();
@@ -108,8 +108,8 @@
org.globus.cog.abstraction.interfaces.Service service = (org.globus.cog.abstraction.interfaces.Service) iterator
.next();
Service xmlService = new Service();
- xmlService.setIdentity(Long.toString(service.getIdentity()
- .getValue()));
+ xmlService.setIdentity(service.getIdentity()
+ .getValue());
String sname = service.getName();
if (sname != null && sname.length() > 0) {
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskUnmarshaller.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskUnmarshaller.java 2008-03-26 14:42:20 UTC (rev 1950)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/xml/TaskUnmarshaller.java 2008-03-26 14:43:13 UTC (rev 1951)
@@ -52,7 +52,7 @@
String xmlIdentity = xmlTask.getIdentity();
if (xmlIdentity != null && xmlIdentity.length() > 0) {
Identity identity = new IdentityImpl();
- identity.setValue(Long.parseLong(xmlIdentity.trim()));
+ identity.setValue(xmlIdentity.trim());
task.setIdentity(identity);
}
@@ -119,7 +119,7 @@
org.globus.cog.abstraction.interfaces.Service service = new ServiceImpl();
Identity identity = new IdentityImpl();
- identity.setValue(Long.parseLong(xmlService.getIdentity()));
+ identity.setValue(xmlService.getIdentity());
service.setIdentity(identity);
service.setName(xmlService.getName());
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-03-26 14:42:35
|
Revision: 1950
http://cogkit.svn.sourceforge.net/cogkit/?rev=1950&view=rev
Author: hategan
Date: 2008-03-26 07:42:20 -0700 (Wed, 26 Mar 2008)
Log Message:
-----------
made status and service contact serializable
Modified Paths:
--------------
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/ServiceContact.java
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/Status.java
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/ServiceContact.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/ServiceContact.java 2008-03-26 14:41:19 UTC (rev 1949)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/ServiceContact.java 2008-03-26 14:42:20 UTC (rev 1950)
@@ -6,11 +6,13 @@
package org.globus.cog.abstraction.interfaces;
+import java.io.Serializable;
+
/**
* This interfaces abstracts the endpoint service handle of remote Grid
* services.
*/
-public interface ServiceContact {
+public interface ServiceContact extends Serializable {
/**
* Sets the host element of this <code>ServiceContact</code>
*/
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/Status.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/Status.java 2008-03-26 14:41:19 UTC (rev 1949)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/Status.java 2008-03-26 14:42:20 UTC (rev 1950)
@@ -6,12 +6,13 @@
package org.globus.cog.abstraction.interfaces;
+import java.io.Serializable;
import java.util.Date;
/**
* An execution status associated with an <code>ExecutableObject</code>.
*/
-public interface Status {
+public interface Status extends Serializable {
/**
* The <code>ExecutableObject</code> is not submitted to the remote
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-03-26 14:42:24
|
Revision: 1949
http://cogkit.svn.sourceforge.net/cogkit/?rev=1949&view=rev
Author: hategan
Date: 2008-03-26 07:41:19 -0700 (Wed, 26 Mar 2008)
Log Message:
-----------
made file location serializable
Modified Paths:
--------------
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/FileLocation.java
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/FileLocation.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/FileLocation.java 2008-03-26 14:40:52 UTC (rev 1948)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/interfaces/FileLocation.java 2008-03-26 14:41:19 UTC (rev 1949)
@@ -9,8 +9,10 @@
*/
package org.globus.cog.abstraction.interfaces;
-public interface FileLocation {
+import java.io.Serializable;
+public interface FileLocation extends Serializable {
+
/**
* Specifies that nothing should be done with a job output stream or that
* there is nothing provided on the input stream.
@@ -62,7 +64,7 @@
int getCode();
- public static class Impl implements FileLocation {
+ public static class Impl implements FileLocation, Serializable {
private int code;
public Impl(int code) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|