From: <ha...@us...> - 2008-10-20 21:45:56
|
Revision: 2239 http://cogkit.svn.sourceforge.net/cogkit/?rev=2239&view=rev Author: hategan Date: 2008-10-20 21:45:51 +0000 (Mon, 20 Oct 2008) Log Message: ----------- shut down workers concurrently Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-10-20 21:45:32 UTC (rev 2238) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-10-20 21:45:51 UTC (rev 2239) @@ -21,6 +21,7 @@ import org.globus.cog.karajan.workflow.service.channels.ChannelContext; import org.globus.cog.karajan.workflow.service.channels.ChannelManager; import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; +import org.globus.cog.karajan.workflow.service.commands.Command; import org.globus.cog.karajan.workflow.service.commands.ShutdownCommand; public class Worker implements StatusListener { @@ -158,14 +159,15 @@ return this.channelContext; } - public void shutdown() { + public ShutdownCallback shutdown() { try { logger.info("Shutting down worker: " + this); KarajanChannel channel = ChannelManager.getManager() .reserveChannel(channelContext); ShutdownCommand sc = new ShutdownCommand(); - sc.execute(channel); - logger.info("Worker shut down: " + this); + ShutdownCallback stc = new ShutdownCallback(); + sc.executeAsync(channel, stc); + return stc; } catch (Exception e) { logger @@ -179,6 +181,7 @@ catch (Exception ee) { logger.info("Failed to cancel worker task", ee); } + return null; } } @@ -195,4 +198,32 @@ shutdownAfter(1); } } + + public static class ShutdownCallback implements Command.Callback { + private boolean done; + + public void errorReceived(Command cmd, String msg, Exception t) { + logger.info("Worker shut down failed: " + this + ". " + msg, t); + synchronized(this) { + done = true; + notifyAll(); + } + } + + public void replyReceived(Command cmd) { + logger.info("Worker shut down: " + this); + synchronized(this) { + done = true; + notifyAll(); + } + } + + public void waitFor() throws InterruptedException { + synchronized(this) { + while(!done) { + wait(); + } + } + } + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |