|
From: <ha...@us...> - 2008-10-20 21:45:56
|
Revision: 2239
http://cogkit.svn.sourceforge.net/cogkit/?rev=2239&view=rev
Author: hategan
Date: 2008-10-20 21:45:51 +0000 (Mon, 20 Oct 2008)
Log Message:
-----------
shut down workers concurrently
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-10-20 21:45:32 UTC (rev 2238)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-10-20 21:45:51 UTC (rev 2239)
@@ -21,6 +21,7 @@
import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.commands.ShutdownCommand;
public class Worker implements StatusListener {
@@ -158,14 +159,15 @@
return this.channelContext;
}
- public void shutdown() {
+ public ShutdownCallback shutdown() {
try {
logger.info("Shutting down worker: " + this);
KarajanChannel channel = ChannelManager.getManager()
.reserveChannel(channelContext);
ShutdownCommand sc = new ShutdownCommand();
- sc.execute(channel);
- logger.info("Worker shut down: " + this);
+ ShutdownCallback stc = new ShutdownCallback();
+ sc.executeAsync(channel, stc);
+ return stc;
}
catch (Exception e) {
logger
@@ -179,6 +181,7 @@
catch (Exception ee) {
logger.info("Failed to cancel worker task", ee);
}
+ return null;
}
}
@@ -195,4 +198,32 @@
shutdownAfter(1);
}
}
+
+ public static class ShutdownCallback implements Command.Callback {
+ private boolean done;
+
+ public void errorReceived(Command cmd, String msg, Exception t) {
+ logger.info("Worker shut down failed: " + this + ". " + msg, t);
+ synchronized(this) {
+ done = true;
+ notifyAll();
+ }
+ }
+
+ public void replyReceived(Command cmd) {
+ logger.info("Worker shut down: " + this);
+ synchronized(this) {
+ done = true;
+ notifyAll();
+ }
+ }
+
+ public void waitFor() throws InterruptedException {
+ synchronized(this) {
+ while(!done) {
+ wait();
+ }
+ }
+ }
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|