|
From: <ha...@us...> - 2008-04-25 13:28:25
|
Revision: 1989
http://cogkit.svn.sourceforge.net/cogkit/?rev=1989&view=rev
Author: hategan
Date: 2008-04-25 06:28:17 -0700 (Fri, 25 Apr 2008)
Log Message:
-----------
track and account for fallen services
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-25 02:16:23 UTC (rev 1988)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-25 13:28:17 UTC (rev 1989)
@@ -25,6 +25,7 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.coaster.service.ServiceShutdownCommand;
import org.globus.cog.abstraction.coaster.service.local.LocalService;
+import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
import org.globus.cog.abstraction.impl.common.task.TaskImpl;
@@ -33,12 +34,14 @@
import org.globus.cog.abstraction.interfaces.ExecutionService;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.abstraction.interfaces.TaskHandler;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
import org.ietf.jgss.GSSCredential;
-public class ServiceManager {
+public class ServiceManager implements StatusListener {
public static final Logger logger = Logger
.getLogger(ServiceManager.class);
@@ -115,6 +118,7 @@
try {
startLocalService();
Task t = buildTask(task);
+ t.addStatusListener(this);
if (logger.isDebugEnabled()) {
logger.debug("Starting coaster service on "
+ task.getService(0) + ". Task is " + t);
@@ -124,7 +128,8 @@
.getAttribute(TASK_ATTR_ID));
synchronized (services) {
services.put(service, url);
- credentials.put(url, task.getService(0).getSecurityContext().getCredentials());
+ credentials.put(url, task.getService(0).getSecurityContext()
+ .getCredentials());
}
return url;
}
@@ -136,6 +141,27 @@
}
}
+ public void statusChanged(StatusEvent event) {
+ Task t = (Task) event.getSource();
+ Status s = event.getStatus();
+ if (s.isTerminal()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Service task " + t
+ + " terminated. Removing service.");
+ }
+ synchronized (services) {
+ Object service = getService(t);
+ String url = (String) services.remove(service);
+ if (url == null) {
+ logger.info("Service does not appear to be registered with this manager");
+ }
+ else {
+ credentials.remove(url);
+ }
+ }
+ }
+ }
+
private static final Integer ZERO = new Integer(0);
protected void increaseUsageCount(Object service) {
@@ -255,24 +281,26 @@
}
return String.valueOf(r);
}
-
+
private class ServiceReaper extends Thread {
-
+
public ServiceReaper() {
setName("Coaster service reaper");
}
-
+
public void run() {
Iterator i = services.values().iterator();
while (i.hasNext()) {
String url = (String) i.next();
Object cred = credentials.get(url);
try {
- KarajanChannel channel = CoasterChannelManager.getManager()
- .reserveChannel(url, (GSSCredential) cred);
+ KarajanChannel channel = CoasterChannelManager
+ .getManager().reserveChannel(url,
+ (GSSCredential) cred);
ServiceShutdownCommand ssc = new ServiceShutdownCommand();
ssc.execute(channel);
- CoasterChannelManager.getManager().releaseChannel(channel);
+ CoasterChannelManager.getManager()
+ .releaseChannel(channel);
}
catch (Exception e) {
logger.warn("Failed to shut down service " + url, e);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|