|
From: <ha...@us...> - 2008-04-22 20:13:46
|
Revision: 1972
http://cogkit.svn.sourceforge.net/cogkit/?rev=1972&view=rev
Author: hategan
Date: 2008-04-22 13:13:20 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
added some logging
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java 2008-04-22 18:13:19 UTC (rev 1971)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java 2008-04-22 20:13:20 UTC (rev 1972)
@@ -34,6 +34,9 @@
}
public void start() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting local service");
+ }
setAuthorization(new SelfAuthorization());
services = new HashMap();
this.accept = true;
@@ -63,6 +66,9 @@
public String waitForRegistration(Task t, String id, long timeout)
throws InterruptedException, IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for registration from service " + id);
+ }
long start = System.currentTimeMillis();
synchronized (services) {
while (!services.containsKey(id)) {
@@ -83,6 +89,9 @@
}
public void registrationReceived(String id, String url, KarajanChannel channel) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received registration from service " + id + ": " + url);
+ }
synchronized (services) {
if (services.containsKey(id)) {
throw new IllegalArgumentException(
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-04-22 18:13:19 UTC (rev 1971)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-04-22 20:13:20 UTC (rev 1972)
@@ -287,5 +287,4 @@
System.err.println(message);
System.exit(1);
}
-
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-22 18:13:19 UTC (rev 1971)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-22 20:13:20 UTC (rev 1972)
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.log4j.Logger;
import org.globus.cog.abstraction.coaster.service.local.LocalService;
import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
@@ -34,6 +35,9 @@
import org.globus.cog.abstraction.interfaces.TaskHandler;
public class ServiceManager {
+ public static final Logger logger = Logger
+ .getLogger(ServiceManager.class);
+
public static final String BOOTSTRAP_SCRIPT = "bootstrap.sh";
public static final String BOOTSTRAP_JAR = "coaster-bootstrap.jar";
public static final String BOOTSTRAP_LIST = "coaster-bootstrap.list";
@@ -65,6 +69,9 @@
public String reserveService(Task task, TaskHandler bootHandler)
throws TaskSubmissionException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reserving service for " + task);
+ }
try {
Object service = getService(task);
// beah. it's impossible to nicely abstract both concurrency
@@ -100,6 +107,10 @@
try {
startLocalService();
Task t = buildTask(task);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting coaster service on "
+ + task.getService(0) + ". Task is " + t);
+ }
bootHandler.submit(t);
String url = localService.waitForRegistration(t, (String) t
.getAttribute(TASK_ATTR_ID));
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-23 17:23:38
|
Revision: 1977
http://cogkit.svn.sourceforge.net/cogkit/?rev=1977&view=rev
Author: hategan
Date: 2008-04-23 10:21:50 -0700 (Wed, 23 Apr 2008)
Log Message:
-----------
added some automatic service shutdown
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Added Paths:
-----------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -16,6 +16,7 @@
import org.globus.cog.abstraction.coaster.service.job.manager.JobQueue;
import org.globus.cog.abstraction.coaster.service.local.JobStatusHandler;
import org.globus.cog.abstraction.coaster.service.local.RegistrationHandler;
+import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
import org.globus.cog.karajan.workflow.service.ConnectionHandler;
import org.globus.cog.karajan.workflow.service.GSSService;
import org.globus.cog.karajan.workflow.service.RequestManager;
@@ -28,11 +29,14 @@
public static final Logger logger = Logger
.getLogger(CoasterService.class);
+ public static final int IDLE_TIMEOUT = 600 * 1000;
+
private String registrationURL, id;
private JobQueue jobQueue;
private LocalTCPService localService;
private Exception e;
private boolean done;
+ private boolean suspended;
public CoasterService() throws IOException {
this(null, null);
@@ -52,13 +56,23 @@
protected void handleConnection(Socket sock) {
logger.debug("Got connection");
- try {
- ConnectionHandler handler = new ConnectionHandler(this, sock,
- new CoasterRequestManager());
- handler.start();
+ if (isSuspended()) {
+ try {
+ sock.close();
+ }
+ catch (IOException e) {
+ logger.warn("Failed to close new connection", e);
+ }
}
- catch (Exception e) {
- logger.warn("Could not start connection handler", e);
+ else {
+ try {
+ ConnectionHandler handler = new ConnectionHandler(this, sock,
+ new CoasterRequestManager());
+ handler.start();
+ }
+ catch (Exception e) {
+ logger.warn("Could not start connection handler", e);
+ }
}
}
@@ -69,7 +83,9 @@
jobQueue = new JobQueue(localService);
jobQueue.start();
localService.setWorkerManager(jobQueue.getWorkerManager());
- logger.info("Started local service: " + localService.getContact());
+ logger
+ .info("Started local service: "
+ + localService.getContact());
if (id != null) {
try {
logger.info("Reserving channel for registration");
@@ -93,9 +109,9 @@
stop(e);
}
}
-
+
private void stop(Exception e) {
- synchronized(this) {
+ synchronized (this) {
this.e = e;
done = true;
notifyAll();
@@ -103,9 +119,10 @@
}
public void waitFor() throws Exception {
- synchronized(this) {
+ synchronized (this) {
while (!done) {
- wait();
+ wait(1000);
+ checkIdleTime();
}
if (e != null) {
throw e;
@@ -113,6 +130,39 @@
}
}
+ private synchronized void checkIdleTime() {
+ // the notification manager should probably not be a singleton
+ long idleTime = NotificationManager.getDefault().getIdleTime();
+ if (idleTime > IDLE_TIMEOUT) {
+ suspend();
+ if (NotificationManager.getDefault().getIdleTime() < IDLE_TIMEOUT) {
+ resume();
+ }
+ else {
+ logger.info("Idle time exceeded. Shutting down service.");
+ shutdown();
+ }
+ }
+ }
+
+ public synchronized void suspend() {
+ this.suspended = true;
+ }
+
+ public synchronized boolean isSuspended() {
+ return suspended;
+ }
+
+ public synchronized void resume() {
+ this.suspended = false;
+ }
+
+ public void shutdown() {
+ super.shutdown();
+ //jobQueue.getWorkerManager().shutdown();
+ done = true;
+ }
+
public JobQueue getJobQueue() {
return jobQueue;
}
@@ -127,7 +177,6 @@
s = new CoasterService(args[0], args[1]);
}
s.start();
- //JobSubmissionTaskHandler.main(new String[0]);
s.waitFor();
System.exit(0);
}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -0,0 +1,19 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+
+public class ServiceShutdownCommand extends Command {
+ public ServiceShutdownCommand() {
+ super(ServiceShutdownHandler.NAME);
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -0,0 +1,29 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+
+
+public class ServiceShutdownHandler extends RequestHandler {
+ public static final String NAME = "SHUTDOWNSERVICE";
+
+ public void requestComplete() throws ProtocolException {
+ try {
+ CoasterService cs = (CoasterService) getChannel().getChannelContext().getService();
+ cs.shutdown();
+ sendReply("OK");
+ }
+ catch (Exception e) {
+ throw new ProtocolException("Failed to shut down service", e);
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -357,4 +357,7 @@
this.prototype = prototype;
}
}
+
+ public void shutdown() {
+ }
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -210,8 +210,8 @@
Task t = new TaskImpl();
t.setType(Task.JOB_SUBMISSION);
JobSpecification js = new JobSpecificationImpl();
- js.setExecutable("/bin/sleep");
- js.addArgument("0");
+ js.setExecutable("/bin/echo");
+ //js.addArgument("0");
t.setSpecification(js);
ExecutionService s = new ExecutionServiceImpl();
s.setServiceContact(new ServiceContactImpl("localhost"));
@@ -256,9 +256,11 @@
System.err.println("All " + ts.length + " jobs done");
System.err.println("Total time: "
+ (System.currentTimeMillis() - s));
+ System.exit(0);
}
catch (Exception e) {
e.printStackTrace();
+ System.exit(1);
}
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -18,6 +18,12 @@
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
+/**
+ * This class is used to keep track of tasks sent
+ * to workers, since the worker is only aware of the
+ * task ID. A notification from a worker needs to
+ * be coupled with a Task object based on the ID.
+ */
public class NotificationManager {
public static final Logger logger = Logger
.getLogger(NotificationManager.class);
@@ -33,6 +39,7 @@
private Map tasks;
private Map pending;
+ private long lastNotificationTime;
public NotificationManager() {
tasks = new HashMap();
@@ -52,7 +59,7 @@
}
}
}
-
+
public void notificationReceived(String id, Status s) {
Task task;
synchronized (tasks) {
@@ -72,8 +79,28 @@
}
}
}
+
+ public long getIdleTime() {
+ synchronized(tasks) {
+ if (tasks.size() == 0) {
+ return System.currentTimeMillis() - lastNotificationTime;
+ }
+ else {
+ return 0;
+ }
+ }
+ }
+
+ public int getActiveTaskCount() {
+ synchronized(tasks) {
+ return tasks.size();
+ }
+ }
private void setStatus(Task t, Status s) {
+ synchronized(tasks) {
+ lastNotificationTime = System.currentTimeMillis();
+ }
try {
t.setStatus(s);
}
@@ -90,5 +117,4 @@
}
p.addLast(status);
}
-
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -18,10 +18,12 @@
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.ServiceShutdownCommand;
import org.globus.cog.abstraction.coaster.service.local.LocalService;
import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
@@ -33,6 +35,8 @@
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.ietf.jgss.GSSCredential;
public class ServiceManager {
public static final Logger logger = Logger
@@ -43,7 +47,6 @@
public static final String BOOTSTRAP_LIST = "coaster-bootstrap.list";
public static final String TASK_ATTR_ID = "coaster:serviceid";
- // public static final String BOOTSTRAP_SCRIPT = "test.sh";
private static ServiceManager defaultManager;
@@ -58,13 +61,18 @@
private BootstrapService bootstrapService;
private LocalService localService;
private Map services;
+ private Map credentials;
private Set starting;
private Map usageCount;
+ private ServiceReaper serviceReaper;
public ServiceManager() {
services = new HashMap();
+ credentials = new HashMap();
starting = new HashSet();
usageCount = new HashMap();
+ serviceReaper = new ServiceReaper();
+ Runtime.getRuntime().addShutdownHook(serviceReaper);
}
public String reserveService(Task task, TaskHandler bootHandler)
@@ -116,6 +124,7 @@
.getAttribute(TASK_ATTR_ID));
synchronized (services) {
services.put(service, url);
+ credentials.put(url, task.getService(0).getSecurityContext().getCredentials());
}
return url;
}
@@ -246,4 +255,29 @@
}
return String.valueOf(r);
}
+
+ private class ServiceReaper extends Thread {
+
+ public ServiceReaper() {
+ setName("Coaster service reaper");
+ }
+
+ public void run() {
+ Iterator i = services.values().iterator();
+ while (i.hasNext()) {
+ String url = (String) i.next();
+ Object cred = credentials.get(url);
+ try {
+ KarajanChannel channel = CoasterChannelManager.getManager()
+ .reserveChannel(url, (GSSCredential) cred);
+ ServiceShutdownCommand ssc = new ServiceShutdownCommand();
+ ssc.execute(channel);
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down service " + url, e);
+ }
+ }
+ }
+
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-24 16:14:49
|
Revision: 1978
http://cogkit.svn.sourceforge.net/cogkit/?rev=1978&view=rev
Author: hategan
Date: 2008-04-24 09:14:29 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
more of automatic shutdown
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -21,5 +21,6 @@
addHandler("CHANNELCONFIG", ChannelConfigurationHandler.class);
addHandler("SHUTDOWN", ShutdownHandler.class);
addHandler(SubmitJobCommand.NAME, SubmitJobHandler.class);
+ addHandler(ServiceShutdownHandler.NAME, ServiceShutdownHandler.class);
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -159,7 +159,7 @@
public void shutdown() {
super.shutdown();
- //jobQueue.getWorkerManager().shutdown();
+ jobQueue.getWorkerManager().shutdown();
done = true;
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -19,8 +19,8 @@
public void requestComplete() throws ProtocolException {
try {
CoasterService cs = (CoasterService) getChannel().getChannelContext().getService();
+ sendReply("OK");
cs.shutdown();
- sendReply("OK");
}
catch (Exception e) {
throw new ProtocolException("Failed to shut down service", e);
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -9,14 +9,20 @@
*/
package org.globus.cog.abstraction.coaster.service.job.manager;
+import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.impl.common.StatusImpl;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.globus.cog.karajan.workflow.service.commands.ShutdownCommand;
public class Worker implements StatusListener {
+ public static final Logger logger = Logger.getLogger(Worker.class);
+
private Task task, running;
private String id;
private WorkerManager manager;
@@ -25,7 +31,7 @@
private int maxWallTime;
private Status error;
private ChannelContext channelContext;
-
+
private static final Long NEVER = new Long(Long.MAX_VALUE);
public Worker(WorkerManager manager, String id, int maxWallTime, Task w,
@@ -101,11 +107,11 @@
public void setScheduledTerminationTime(long l) {
this.scheduledTerminationTime = new Long(l);
}
-
+
public Status getStatus() {
- return error;
+ return error;
}
-
+
public String toString() {
return "Worker[" + id + "]";
}
@@ -113,8 +119,20 @@
public void setChannelContext(ChannelContext cc) {
this.channelContext = cc;
}
-
+
public ChannelContext getChannelContext() {
return this.channelContext;
}
+
+ public void shutdown() {
+ try {
+ KarajanChannel channel = ChannelManager.getManager()
+ .reserveChannel(channelContext);
+ ShutdownCommand sc = new ShutdownCommand();
+ sc.execute(channel);
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down worker", e);
+ }
+ }
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -359,5 +359,10 @@
}
public void shutdown() {
+ Iterator i = ready.values().iterator();
+ while (i.hasNext()) {
+ Worker wr = (Worker) i.next();
+ wr.shutdown();
+ }
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -272,6 +272,7 @@
.reserveChannel(url, (GSSCredential) cred);
ServiceShutdownCommand ssc = new ServiceShutdownCommand();
ssc.execute(channel);
+ CoasterChannelManager.getManager().releaseChannel(channel);
}
catch (Exception e) {
logger.warn("Failed to shut down service " + url, e);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-25 02:16:25
|
Revision: 1988
http://cogkit.svn.sourceforge.net/cogkit/?rev=1988&view=rev
Author: hategan
Date: 2008-04-24 19:16:23 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
fixed walltime spec; shutdown of queued workers; fixed idle race
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -9,21 +9,26 @@
*/
package org.globus.cog.abstraction.coaster.service;
+import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+public class ServiceShutdownHandler extends RequestHandler {
+ public static final Logger logger = Logger
+ .getLogger(ServiceShutdownHandler.class);
-public class ServiceShutdownHandler extends RequestHandler {
public static final String NAME = "SHUTDOWNSERVICE";
-
- public void requestComplete() throws ProtocolException {
- try {
- CoasterService cs = (CoasterService) getChannel().getChannelContext().getService();
- sendReply("OK");
- cs.shutdown();
- }
- catch (Exception e) {
- throw new ProtocolException("Failed to shut down service", e);
- }
- }
+
+ public void requestComplete() throws ProtocolException {
+ try {
+ CoasterService cs = (CoasterService) getChannel()
+ .getChannelContext().getService();
+ sendReply("OK");
+ cs.shutdown();
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down service", e);
+ throw new ProtocolException("Failed to shut down service", e);
+ }
+ }
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -72,6 +72,10 @@
manager.workerTerminated(this);
}
}
+
+ public Task getWorkerTask() {
+ return task;
+ }
public Task getRunning() {
return running;
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -14,6 +14,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -147,7 +148,9 @@
private void startWorker(int maxWallTime, Task prototype)
throws InvalidServiceContactException {
int id = sr.nextInt();
- System.err.println("Starting worker with id=" + id);
+ if (logger.isInfoEnabled()) {
+ logger.info("Starting worker with id=" + id + " and maxwalltime=" + maxWallTime + "s");
+ }
String sid = String.valueOf(id);
Task t = new TaskImpl();
t.setType(Task.JOB_SUBMISSION);
@@ -177,7 +180,8 @@
js.addArgument(callbackURI.toString());
js.setStdOutputLocation(FileLocation.MEMORY);
js.setStdErrorLocation(FileLocation.MEMORY);
- js.setAttribute("maxwalltime", new WallTime(maxWallTime).getSpecInMinutes());
+ js.setAttribute("maxwalltime", new WallTime(maxWallTime)
+ .getSpecInMinutes());
return js;
}
@@ -188,7 +192,7 @@
ExecutionService p = (ExecutionService) prototype.getService(0);
String jm = p.getJobManager();
int colon = jm.indexOf(':');
- //remove provider used to bootstrap coasters
+ // remove provider used to bootstrap coasters
jm = jm.substring(colon + 1);
colon = jm.indexOf(':');
if (colon == -1) {
@@ -266,8 +270,8 @@
}
synchronized (allocationRequests) {
if (allocationRequests.size() < MAX_STARTING_WORKERS) {
- allocationRequests.add(new AllocationRequest(maxWallTime,
- prototype));
+ allocationRequests.add(new AllocationRequest(
+ maxWallTime, prototype));
allocationRequests.notify();
}
else {
@@ -368,10 +372,24 @@
}
public void shutdown() {
- Iterator i = ready.values().iterator();
- while (i.hasNext()) {
- Worker wr = (Worker) i.next();
- wr.shutdown();
+ synchronized (this) {
+ Iterator i;
+ i = ready.values().iterator();
+ while (i.hasNext()) {
+ Worker wr = (Worker) i.next();
+ wr.shutdown();
+ }
+ i = new ArrayList(requested.values()).iterator();
+ while (i.hasNext()) {
+ Worker wr = (Worker) i.next();
+ try {
+ handler.cancel(wr.getWorkerTask());
+ }
+ catch (Exception e) {
+ logger.warn("Failed to cancel queued worker task "
+ + wr.getWorkerTask(), e);
+ }
+ }
}
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -234,7 +234,7 @@
public static void main(String[] args) {
try {
long s = System.currentTimeMillis();
- Task[] ts = new Task[2];
+ Task[] ts = new Task[512];
for (int i = 0; i < ts.length; i++) {
ts[i] = submitTask();
if (i % 100 == 0) {
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -69,6 +69,7 @@
else {
task = (Task) tasks.get(id);
}
+ lastNotificationTime = System.currentTimeMillis();
}
if (task != null) {
setStatus(task, s);
@@ -82,7 +83,7 @@
public long getIdleTime() {
synchronized(tasks) {
- if (tasks.size() == 0) {
+ if (tasks.size() == 0 && lastNotificationTime != 0) {
return System.currentTimeMillis() - lastNotificationTime;
}
else {
@@ -98,9 +99,6 @@
}
private void setStatus(Task t, Status s) {
- synchronized(tasks) {
- lastNotificationTime = System.currentTimeMillis();
- }
try {
t.setStatus(s);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-30 02:28:27
|
Revision: 1992
http://cogkit.svn.sourceforge.net/cogkit/?rev=1992&view=rev
Author: hategan
Date: 2008-04-29 19:28:21 -0700 (Tue, 29 Apr 2008)
Log Message:
-----------
removed redirection (to make it useable with ws-gram)
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-29 12:37:24 UTC (rev 1991)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-30 02:28:21 UTC (rev 1992)
@@ -34,7 +34,6 @@
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
import org.globus.cog.abstraction.impl.common.task.TaskImpl;
import org.globus.cog.abstraction.interfaces.ExecutionService;
-import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
@@ -178,8 +177,6 @@
js.addArgument(script.getAbsolutePath());
js.addArgument(id);
js.addArgument(callbackURI.toString());
- js.setStdOutputLocation(FileLocation.MEMORY);
- js.setStdErrorLocation(FileLocation.MEMORY);
js.setAttribute("maxwalltime", new WallTime(maxWallTime)
.getSpecInMinutes());
return js;
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-29 12:37:24 UTC (rev 1991)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-30 02:28:21 UTC (rev 1992)
@@ -201,8 +201,6 @@
s.setServiceContact(orig.getService(0).getServiceContact());
s.setJobManager("fork");
t.setService(0, s);
- js.setStdOutputLocation(FileLocation.MEMORY);
- js.setStdErrorLocation(FileLocation.MEMORY);
return t;
}
catch (Exception e) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|