|
From: <ha...@us...> - 2008-04-23 17:23:38
|
Revision: 1977
http://cogkit.svn.sourceforge.net/cogkit/?rev=1977&view=rev
Author: hategan
Date: 2008-04-23 10:21:50 -0700 (Wed, 23 Apr 2008)
Log Message:
-----------
added some automatic service shutdown
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Added Paths:
-----------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -16,6 +16,7 @@
import org.globus.cog.abstraction.coaster.service.job.manager.JobQueue;
import org.globus.cog.abstraction.coaster.service.local.JobStatusHandler;
import org.globus.cog.abstraction.coaster.service.local.RegistrationHandler;
+import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
import org.globus.cog.karajan.workflow.service.ConnectionHandler;
import org.globus.cog.karajan.workflow.service.GSSService;
import org.globus.cog.karajan.workflow.service.RequestManager;
@@ -28,11 +29,14 @@
public static final Logger logger = Logger
.getLogger(CoasterService.class);
+ public static final int IDLE_TIMEOUT = 600 * 1000;
+
private String registrationURL, id;
private JobQueue jobQueue;
private LocalTCPService localService;
private Exception e;
private boolean done;
+ private boolean suspended;
public CoasterService() throws IOException {
this(null, null);
@@ -52,13 +56,23 @@
protected void handleConnection(Socket sock) {
logger.debug("Got connection");
- try {
- ConnectionHandler handler = new ConnectionHandler(this, sock,
- new CoasterRequestManager());
- handler.start();
+ if (isSuspended()) {
+ try {
+ sock.close();
+ }
+ catch (IOException e) {
+ logger.warn("Failed to close new connection", e);
+ }
}
- catch (Exception e) {
- logger.warn("Could not start connection handler", e);
+ else {
+ try {
+ ConnectionHandler handler = new ConnectionHandler(this, sock,
+ new CoasterRequestManager());
+ handler.start();
+ }
+ catch (Exception e) {
+ logger.warn("Could not start connection handler", e);
+ }
}
}
@@ -69,7 +83,9 @@
jobQueue = new JobQueue(localService);
jobQueue.start();
localService.setWorkerManager(jobQueue.getWorkerManager());
- logger.info("Started local service: " + localService.getContact());
+ logger
+ .info("Started local service: "
+ + localService.getContact());
if (id != null) {
try {
logger.info("Reserving channel for registration");
@@ -93,9 +109,9 @@
stop(e);
}
}
-
+
private void stop(Exception e) {
- synchronized(this) {
+ synchronized (this) {
this.e = e;
done = true;
notifyAll();
@@ -103,9 +119,10 @@
}
public void waitFor() throws Exception {
- synchronized(this) {
+ synchronized (this) {
while (!done) {
- wait();
+ wait(1000);
+ checkIdleTime();
}
if (e != null) {
throw e;
@@ -113,6 +130,39 @@
}
}
+ private synchronized void checkIdleTime() {
+ // the notification manager should probably not be a singleton
+ long idleTime = NotificationManager.getDefault().getIdleTime();
+ if (idleTime > IDLE_TIMEOUT) {
+ suspend();
+ if (NotificationManager.getDefault().getIdleTime() < IDLE_TIMEOUT) {
+ resume();
+ }
+ else {
+ logger.info("Idle time exceeded. Shutting down service.");
+ shutdown();
+ }
+ }
+ }
+
+ public synchronized void suspend() {
+ this.suspended = true;
+ }
+
+ public synchronized boolean isSuspended() {
+ return suspended;
+ }
+
+ public synchronized void resume() {
+ this.suspended = false;
+ }
+
+ public void shutdown() {
+ super.shutdown();
+ //jobQueue.getWorkerManager().shutdown();
+ done = true;
+ }
+
public JobQueue getJobQueue() {
return jobQueue;
}
@@ -127,7 +177,6 @@
s = new CoasterService(args[0], args[1]);
}
s.start();
- //JobSubmissionTaskHandler.main(new String[0]);
s.waitFor();
System.exit(0);
}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -0,0 +1,19 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+
+public class ServiceShutdownCommand extends Command {
+ public ServiceShutdownCommand() {
+ super(ServiceShutdownHandler.NAME);
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -0,0 +1,29 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+
+
+public class ServiceShutdownHandler extends RequestHandler {
+ public static final String NAME = "SHUTDOWNSERVICE";
+
+ public void requestComplete() throws ProtocolException {
+ try {
+ CoasterService cs = (CoasterService) getChannel().getChannelContext().getService();
+ cs.shutdown();
+ sendReply("OK");
+ }
+ catch (Exception e) {
+ throw new ProtocolException("Failed to shut down service", e);
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -357,4 +357,7 @@
this.prototype = prototype;
}
}
+
+ public void shutdown() {
+ }
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -210,8 +210,8 @@
Task t = new TaskImpl();
t.setType(Task.JOB_SUBMISSION);
JobSpecification js = new JobSpecificationImpl();
- js.setExecutable("/bin/sleep");
- js.addArgument("0");
+ js.setExecutable("/bin/echo");
+ //js.addArgument("0");
t.setSpecification(js);
ExecutionService s = new ExecutionServiceImpl();
s.setServiceContact(new ServiceContactImpl("localhost"));
@@ -256,9 +256,11 @@
System.err.println("All " + ts.length + " jobs done");
System.err.println("Total time: "
+ (System.currentTimeMillis() - s));
+ System.exit(0);
}
catch (Exception e) {
e.printStackTrace();
+ System.exit(1);
}
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -18,6 +18,12 @@
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
+/**
+ * This class is used to keep track of tasks sent
+ * to workers, since the worker is only aware of the
+ * task ID. A notification from a worker needs to
+ * be coupled with a Task object based on the ID.
+ */
public class NotificationManager {
public static final Logger logger = Logger
.getLogger(NotificationManager.class);
@@ -33,6 +39,7 @@
private Map tasks;
private Map pending;
+ private long lastNotificationTime;
public NotificationManager() {
tasks = new HashMap();
@@ -52,7 +59,7 @@
}
}
}
-
+
public void notificationReceived(String id, Status s) {
Task task;
synchronized (tasks) {
@@ -72,8 +79,28 @@
}
}
}
+
+ public long getIdleTime() {
+ synchronized(tasks) {
+ if (tasks.size() == 0) {
+ return System.currentTimeMillis() - lastNotificationTime;
+ }
+ else {
+ return 0;
+ }
+ }
+ }
+
+ public int getActiveTaskCount() {
+ synchronized(tasks) {
+ return tasks.size();
+ }
+ }
private void setStatus(Task t, Status s) {
+ synchronized(tasks) {
+ lastNotificationTime = System.currentTimeMillis();
+ }
try {
t.setStatus(s);
}
@@ -90,5 +117,4 @@
}
p.addLast(status);
}
-
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -18,10 +18,12 @@
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.ServiceShutdownCommand;
import org.globus.cog.abstraction.coaster.service.local.LocalService;
import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
@@ -33,6 +35,8 @@
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.ietf.jgss.GSSCredential;
public class ServiceManager {
public static final Logger logger = Logger
@@ -43,7 +47,6 @@
public static final String BOOTSTRAP_LIST = "coaster-bootstrap.list";
public static final String TASK_ATTR_ID = "coaster:serviceid";
- // public static final String BOOTSTRAP_SCRIPT = "test.sh";
private static ServiceManager defaultManager;
@@ -58,13 +61,18 @@
private BootstrapService bootstrapService;
private LocalService localService;
private Map services;
+ private Map credentials;
private Set starting;
private Map usageCount;
+ private ServiceReaper serviceReaper;
public ServiceManager() {
services = new HashMap();
+ credentials = new HashMap();
starting = new HashSet();
usageCount = new HashMap();
+ serviceReaper = new ServiceReaper();
+ Runtime.getRuntime().addShutdownHook(serviceReaper);
}
public String reserveService(Task task, TaskHandler bootHandler)
@@ -116,6 +124,7 @@
.getAttribute(TASK_ATTR_ID));
synchronized (services) {
services.put(service, url);
+ credentials.put(url, task.getService(0).getSecurityContext().getCredentials());
}
return url;
}
@@ -246,4 +255,29 @@
}
return String.valueOf(r);
}
+
+ private class ServiceReaper extends Thread {
+
+ public ServiceReaper() {
+ setName("Coaster service reaper");
+ }
+
+ public void run() {
+ Iterator i = services.values().iterator();
+ while (i.hasNext()) {
+ String url = (String) i.next();
+ Object cred = credentials.get(url);
+ try {
+ KarajanChannel channel = CoasterChannelManager.getManager()
+ .reserveChannel(url, (GSSCredential) cred);
+ ServiceShutdownCommand ssc = new ServiceShutdownCommand();
+ ssc.execute(channel);
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down service " + url, e);
+ }
+ }
+ }
+
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|