|
From: <ha...@us...> - 2008-04-25 02:16:25
|
Revision: 1988
http://cogkit.svn.sourceforge.net/cogkit/?rev=1988&view=rev
Author: hategan
Date: 2008-04-24 19:16:23 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
fixed walltime spec; shutdown of queued workers; fixed idle race
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -9,21 +9,26 @@
*/
package org.globus.cog.abstraction.coaster.service;
+import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+public class ServiceShutdownHandler extends RequestHandler {
+ public static final Logger logger = Logger
+ .getLogger(ServiceShutdownHandler.class);
-public class ServiceShutdownHandler extends RequestHandler {
public static final String NAME = "SHUTDOWNSERVICE";
-
- public void requestComplete() throws ProtocolException {
- try {
- CoasterService cs = (CoasterService) getChannel().getChannelContext().getService();
- sendReply("OK");
- cs.shutdown();
- }
- catch (Exception e) {
- throw new ProtocolException("Failed to shut down service", e);
- }
- }
+
+ public void requestComplete() throws ProtocolException {
+ try {
+ CoasterService cs = (CoasterService) getChannel()
+ .getChannelContext().getService();
+ sendReply("OK");
+ cs.shutdown();
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down service", e);
+ throw new ProtocolException("Failed to shut down service", e);
+ }
+ }
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -72,6 +72,10 @@
manager.workerTerminated(this);
}
}
+
+ public Task getWorkerTask() {
+ return task;
+ }
public Task getRunning() {
return running;
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -14,6 +14,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -147,7 +148,9 @@
private void startWorker(int maxWallTime, Task prototype)
throws InvalidServiceContactException {
int id = sr.nextInt();
- System.err.println("Starting worker with id=" + id);
+ if (logger.isInfoEnabled()) {
+ logger.info("Starting worker with id=" + id + " and maxwalltime=" + maxWallTime + "s");
+ }
String sid = String.valueOf(id);
Task t = new TaskImpl();
t.setType(Task.JOB_SUBMISSION);
@@ -177,7 +180,8 @@
js.addArgument(callbackURI.toString());
js.setStdOutputLocation(FileLocation.MEMORY);
js.setStdErrorLocation(FileLocation.MEMORY);
- js.setAttribute("maxwalltime", new WallTime(maxWallTime).getSpecInMinutes());
+ js.setAttribute("maxwalltime", new WallTime(maxWallTime)
+ .getSpecInMinutes());
return js;
}
@@ -188,7 +192,7 @@
ExecutionService p = (ExecutionService) prototype.getService(0);
String jm = p.getJobManager();
int colon = jm.indexOf(':');
- //remove provider used to bootstrap coasters
+ // remove provider used to bootstrap coasters
jm = jm.substring(colon + 1);
colon = jm.indexOf(':');
if (colon == -1) {
@@ -266,8 +270,8 @@
}
synchronized (allocationRequests) {
if (allocationRequests.size() < MAX_STARTING_WORKERS) {
- allocationRequests.add(new AllocationRequest(maxWallTime,
- prototype));
+ allocationRequests.add(new AllocationRequest(
+ maxWallTime, prototype));
allocationRequests.notify();
}
else {
@@ -368,10 +372,24 @@
}
public void shutdown() {
- Iterator i = ready.values().iterator();
- while (i.hasNext()) {
- Worker wr = (Worker) i.next();
- wr.shutdown();
+ synchronized (this) {
+ Iterator i;
+ i = ready.values().iterator();
+ while (i.hasNext()) {
+ Worker wr = (Worker) i.next();
+ wr.shutdown();
+ }
+ i = new ArrayList(requested.values()).iterator();
+ while (i.hasNext()) {
+ Worker wr = (Worker) i.next();
+ try {
+ handler.cancel(wr.getWorkerTask());
+ }
+ catch (Exception e) {
+ logger.warn("Failed to cancel queued worker task "
+ + wr.getWorkerTask(), e);
+ }
+ }
}
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -234,7 +234,7 @@
public static void main(String[] args) {
try {
long s = System.currentTimeMillis();
- Task[] ts = new Task[2];
+ Task[] ts = new Task[512];
for (int i = 0; i < ts.length; i++) {
ts[i] = submitTask();
if (i % 100 == 0) {
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -69,6 +69,7 @@
else {
task = (Task) tasks.get(id);
}
+ lastNotificationTime = System.currentTimeMillis();
}
if (task != null) {
setStatus(task, s);
@@ -82,7 +83,7 @@
public long getIdleTime() {
synchronized(tasks) {
- if (tasks.size() == 0) {
+ if (tasks.size() == 0 && lastNotificationTime != 0) {
return System.currentTimeMillis() - lastNotificationTime;
}
else {
@@ -98,9 +99,6 @@
}
private void setStatus(Task t, Status s) {
- synchronized(tasks) {
- lastNotificationTime = System.currentTimeMillis();
- }
try {
t.setStatus(s);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|