You can subscribe to this list here.
| 2006 |
Jan
|
Feb
|
Mar
|
Apr
(39) |
May
(165) |
Jun
(164) |
Jul
(127) |
Aug
(81) |
Sep
(146) |
Oct
(375) |
Nov
(241) |
Dec
(77) |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2007 |
Jan
(42) |
Feb
(38) |
Mar
(30) |
Apr
(6) |
May
(17) |
Jun
|
Jul
(15) |
Aug
(59) |
Sep
(31) |
Oct
(44) |
Nov
(30) |
Dec
(12) |
| 2008 |
Jan
(9) |
Feb
(63) |
Mar
(18) |
Apr
(43) |
May
(28) |
Jun
(32) |
Jul
(61) |
Aug
(5) |
Sep
(72) |
Oct
(48) |
Nov
(6) |
Dec
|
|
From: <ha...@us...> - 2008-05-05 08:27:45
|
Revision: 2000
http://cogkit.svn.sourceforge.net/cogkit/?rev=2000&view=rev
Author: hategan
Date: 2008-05-05 01:27:40 -0700 (Mon, 05 May 2008)
Log Message:
-----------
include the actual code for unknown codes
Modified Paths:
--------------
trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/StatusImpl.java
Modified: trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/StatusImpl.java
===================================================================
--- trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/StatusImpl.java 2008-05-02 20:40:21 UTC (rev 1999)
+++ trunk/current/src/cog/modules/abstraction-common/src/org/globus/cog/abstraction/impl/common/StatusImpl.java 2008-05-05 08:27:40 UTC (rev 2000)
@@ -123,7 +123,7 @@
return "Unsubmitted";
default:
- return "Unknown";
+ return "Unknown (" + statusCode + ")";
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-05-02 20:40:24
|
Revision: 1999
http://cogkit.svn.sourceforge.net/cogkit/?rev=1999&view=rev
Author: hategan
Date: 2008-05-02 13:40:21 -0700 (Fri, 02 May 2008)
Log Message:
-----------
expose task stacks to descendants
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/AbstractGridNode.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/AbstractGridNode.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/AbstractGridNode.java 2008-04-30 23:36:25 UTC (rev 1998)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/AbstractGridNode.java 2008-05-02 20:40:21 UTC (rev 1999)
@@ -148,6 +148,7 @@
public static final Arg A_SECURITY_CONTEXT = new Arg.TypedPositional("securityContext",
SecurityContext.class, "Security Context");
+
public void setSecurityContext(VariableStack stack, Service service) throws ExecutionException {
if (A_SECURITY_CONTEXT.isPresent(stack)) {
SecurityContext sc = (SecurityContext) A_SECURITY_CONTEXT.getValue(stack);
@@ -188,7 +189,7 @@
}
}
}
-
+
public void submitUnscheduled(TaskHandler handler, Task task, VariableStack stack)
throws ExecutionException {
setTaskIdentity(stack, task);
@@ -213,7 +214,7 @@
}
}
}
-
+
public void submitScheduled(Scheduler scheduler, Task task, VariableStack stack,
Object constraints) {
setTaskIdentity(stack, task);
@@ -227,7 +228,7 @@
}
scheduler.enqueue(task, constraints);
}
-
+
protected void setTaskIdentity(VariableStack stack, Task task) {
try {
task.setIdentity(new IdentityImpl(ThreadingContext.get(stack).toString()));
@@ -237,6 +238,12 @@
}
}
+ protected VariableStack getStack(Task t) {
+ synchronized (tasks) {
+ return (VariableStack) tasks.get(t);
+ }
+ }
+
public void statusChanged(StatusEvent e) {
try {
int status = e.getStatus().getStatusCode();
@@ -247,10 +254,7 @@
return;
}
Task task = (Task) e.getSource();
- VariableStack stack;
- synchronized (tasks) {
- stack = (VariableStack) tasks.get(task);
- }
+ VariableStack stack = getStack(task);
if (stack == null) {
logger.warn("Received status event from unknown task " + e.getSource());
return;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-30 23:36:33
|
Revision: 1998
http://cogkit.svn.sourceforge.net/cogkit/?rev=1998&view=rev
Author: hategan
Date: 2008-04-30 16:36:25 -0700 (Wed, 30 Apr 2008)
Log Message:
-----------
updated changes
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/CHANGES.txt
Modified: trunk/current/src/cog/modules/karajan/CHANGES.txt
===================================================================
--- trunk/current/src/cog/modules/karajan/CHANGES.txt 2008-04-30 23:36:09 UTC (rev 1997)
+++ trunk/current/src/cog/modules/karajan/CHANGES.txt 2008-04-30 23:36:25 UTC (rev 1998)
@@ -1,3 +1,8 @@
+(04/30/2008)
+
+*** Added raceFor. It's a blend between race() and for(). It will
+ iterate in parallel until the first iteration completes.
+
(04/29/2008)
*** WeightHostScheduler initial score and load factor can now be set
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-30 23:36:12
|
Revision: 1997
http://cogkit.svn.sourceforge.net/cogkit/?rev=1997&view=rev
Author: hategan
Date: 2008-04-30 16:36:09 -0700 (Wed, 30 Apr 2008)
Log Message:
-----------
added raceFor
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/resources/sys-common.xml
Modified: trunk/current/src/cog/modules/karajan/resources/sys-common.xml
===================================================================
--- trunk/current/src/cog/modules/karajan/resources/sys-common.xml 2008-04-30 23:35:34 UTC (rev 1996)
+++ trunk/current/src/cog/modules/karajan/resources/sys-common.xml 2008-04-30 23:36:09 UTC (rev 1997)
@@ -21,6 +21,7 @@
<export name="for"><elementDef classname="org.globus.cog.karajan.workflow.nodes.For"/></export>
<export name="parallelFor"><elementDef classname="org.globus.cog.karajan.workflow.nodes.ParallelFor"/></export>
<export name="uParallelFor"><elementDef classname="org.globus.cog.karajan.workflow.nodes.UParallelFor"/></export>
+ <export name="raceFor"><elementDef classname="org.globus.cog.karajan.workflow.nodes.RaceFor"/></export>
<export name="each"><elementDef classname="org.globus.cog.karajan.workflow.nodes.Each"/></export>
<export name="default"><elementDef classname="org.globus.cog.karajan.workflow.nodes.DefaultParameterNode"/></export>
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-30 23:35:37
|
Revision: 1996
http://cogkit.svn.sourceforge.net/cogkit/?rev=1996&view=rev
Author: hategan
Date: 2008-04-30 16:35:34 -0700 (Wed, 30 Apr 2008)
Log Message:
-----------
added raceFor
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/AbstractIterator.java
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/RaceFor.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/AbstractIterator.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/AbstractIterator.java 2008-04-30 23:34:45 UTC (rev 1995)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/AbstractIterator.java 2008-04-30 23:35:34 UTC (rev 1996)
@@ -11,6 +11,7 @@
import org.apache.log4j.Logger;
import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.stack.StackFrame;
import org.globus.cog.karajan.stack.VariableStack;
import org.globus.cog.karajan.util.Identifier;
import org.globus.cog.karajan.util.KarajanIterator;
@@ -43,6 +44,18 @@
protected final synchronized int preIncRunning(VariableStack stack) {
return stack.getRegs().preIncIB();
}
+
+ protected final int getRunning(VariableStack stack) {
+ return stack.getRegs().getIB();
+ }
+
+ protected final void setRunning(StackFrame frame, int running) {
+ frame.getRegs().setIB(running);
+ }
+
+ protected final int getRunning(StackFrame frame) {
+ return frame.getRegs().getIB();
+ }
protected void partialArgumentsEvaluated(VariableStack stack) throws ExecutionException {
Identifier var = TypeUtil.toIdentifier(A_NAME.getValue(stack));
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/RaceFor.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/RaceFor.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/RaceFor.java 2008-04-30 23:35:34 UTC (rev 1996)
@@ -0,0 +1,109 @@
+// ----------------------------------------------------------------------
+// This code is developed as part of the Java CoG Kit project
+// The terms of the license can be found at http://www.cogkit.org/license
+// This message may not be removed or altered.
+// ----------------------------------------------------------------------
+
+/*
+ * Created on Jul 7, 2003
+ */
+package org.globus.cog.karajan.workflow.nodes;
+
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.globus.cog.karajan.arguments.Arg;
+import org.globus.cog.karajan.arguments.ArgUtil;
+import org.globus.cog.karajan.arguments.NamedArguments;
+import org.globus.cog.karajan.arguments.VariableArguments;
+import org.globus.cog.karajan.stack.StackFrame;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.Identifier;
+import org.globus.cog.karajan.util.KarajanIterator;
+import org.globus.cog.karajan.util.ThreadingContext;
+import org.globus.cog.karajan.workflow.ExecutionException;
+
+public class RaceFor extends AbstractParallelIterator {
+ public static final Arg A_NAME = new Arg.Positional("name");
+ public static final Arg A_IN = new Arg.Positional("in");
+
+ static {
+ setArguments(RaceFor.class, new Arg[] { A_NAME, A_IN });
+ }
+
+
+
+ public void iterate(VariableStack stack, Identifier var, KarajanIterator i)
+ throws ExecutionException {
+ stack.enter();
+ ThreadingContext.set(stack, ThreadingContext.get(stack).split(0));
+ super.iterate(stack, var, i);
+ }
+
+ protected void initializeChannelBuffers(VariableStack stack) throws ExecutionException {
+ super.initializeChannelBuffers(stack);
+ }
+
+ protected void addChannelBuffers(VariableStack stack) throws ExecutionException {
+ ArgUtil.initializeNamedArguments(stack);
+ ArgUtil.initializeVariableArguments(stack);
+ ArgUtil.duplicateChannels(stack);
+ }
+
+ protected void closeBuffers(VariableStack stack) throws ExecutionException {
+ NamedArguments named = null;
+ VariableArguments vargs = null;
+ Map channels = null;
+
+ named = ArgUtil.getNamedArguments(stack);
+ vargs = ArgUtil.getVariableArguments(stack);
+ Set dchannels = ArgUtil.getDefinedChannels(stack);
+ channels = new Hashtable();
+ Iterator i = dchannels.iterator();
+ while (i.hasNext()) {
+ Arg.Channel channel = (Arg.Channel) i.next();
+ channels.put(channel, ArgUtil.getChannelArguments(stack, channel));
+ }
+
+ stack.leave();
+
+ VariableArguments ret = ArgUtil.getVariableReturn(stack);
+ ArgUtil.getNamedReturn(stack).merge(named);
+ ArgUtil.getVariableReturn(stack).merge(vargs);
+ i = channels.keySet().iterator();
+ while (i.hasNext()) {
+ Arg.Channel channel = (Arg.Channel) i.next();
+ channel.getReturn(stack).merge((VariableArguments) channels.get(channel));
+ }
+ }
+
+ protected void iterationCompleted(VariableStack stack) throws ExecutionException {
+ StackFrame parent = stack.parentFrame();
+ synchronized (parent) {
+ boolean b = parent.getRegs().getBB();
+ if (b) {
+ return;
+ }
+ else {
+ parent.getRegs().setBB(true);
+ }
+ }
+ closeBuffers(stack);
+ stack.setVar("#abort", true);
+ stack.getExecutionContext().getStateManager().abortContext(ThreadingContext.get(stack));
+ stack.leave();
+ complete(stack);
+ }
+
+ protected boolean testAndSetChildFailed(VariableStack stack) {
+ StackFrame parent = stack.parentFrame();
+ synchronized (parent) {
+ boolean b = parent.getRegs().getBB();
+ parent.getRegs().setBB(true);
+ return b;
+ }
+ }
+
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-30 23:34:48
|
Revision: 1995
http://cogkit.svn.sourceforge.net/cogkit/?rev=1995&view=rev
Author: hategan
Date: 2008-04-30 16:34:45 -0700 (Wed, 30 Apr 2008)
Log Message:
-----------
fixed aborts in wait()
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/WaitNode.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/WaitNode.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/WaitNode.java 2008-04-30 23:33:53 UTC (rev 1994)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/WaitNode.java 2008-04-30 23:34:45 UTC (rev 1995)
@@ -43,6 +43,7 @@
if (stack.isDefined("#abort")) {
logger.debug("Aborting wait");
abort(stack);
+ return;
}
logger.debug("Stateful element count: "
+ stack.getExecutionContext().getStateManager().getExecuting().size());
@@ -54,12 +55,12 @@
}
}
if (A_DELAY.isPresent(stack)) {
- timer.schedule(new Task(this, stack), TypeUtil.toInt(A_DELAY.getValue(stack)));
+ timer.schedule(newTask(stack), TypeUtil.toInt(A_DELAY.getValue(stack)));
}
else if (A_UNTIL.isPresent(stack)) {
String until = TypeUtil.toString(A_UNTIL.getValue(stack));
try {
- timer.schedule(new Task(this, stack), DateFormat.getDateTimeInstance().parse(until));
+ timer.schedule(newTask(stack), DateFormat.getDateTimeInstance().parse(until));
}
catch (ParseException e) {
try {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-30 23:34:05
|
Revision: 1994
http://cogkit.svn.sourceforge.net/cogkit/?rev=1994&view=rev
Author: hategan
Date: 2008-04-30 16:33:53 -0700 (Wed, 30 Apr 2008)
Log Message:
-----------
sync
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RemoteException.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java 2008-04-30 15:26:38 UTC (rev 1993)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java 2008-04-30 23:33:53 UTC (rev 1994)
@@ -38,7 +38,6 @@
handler = (RequestHandler) handlerClass.newInstance();
}
catch (Exception e) {
-
throw new NoSuchHandlerException("Could not instantiate handler for " + cmd, e);
}
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RemoteException.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RemoteException.java 2008-04-30 15:26:38 UTC (rev 1993)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RemoteException.java 2008-04-30 23:33:53 UTC (rev 1994)
@@ -24,6 +24,7 @@
s.println("Remote exception:");
s.println(remote);
}
+
public void printStackTrace(PrintWriter s) {
super.printStackTrace(s);
s.println("Remote exception:");
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-04-30 15:26:38 UTC (rev 1993)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-04-30 23:33:53 UTC (rev 1994)
@@ -267,7 +267,7 @@
}
}
if (!any) {
- Thread.sleep(10);
+ Thread.sleep(20);
}
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-30 15:26:56
|
Revision: 1993
http://cogkit.svn.sourceforge.net/cogkit/?rev=1993&view=rev
Author: hategan
Date: 2008-04-30 08:26:38 -0700 (Wed, 30 Apr 2008)
Log Message:
-----------
only print warning when redirection requested
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-gt4_0_0/CHANGES.txt
trunk/current/src/cog/modules/provider-gt4_0_0/src/org/globus/cog/abstraction/impl/execution/gt4_0_0/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-gt4_0_0/CHANGES.txt
===================================================================
--- trunk/current/src/cog/modules/provider-gt4_0_0/CHANGES.txt 2008-04-30 02:28:21 UTC (rev 1992)
+++ trunk/current/src/cog/modules/provider-gt4_0_0/CHANGES.txt 2008-04-30 15:26:38 UTC (rev 1993)
@@ -1,3 +1,10 @@
+(04/30/2008)
+
+*** Print a warning instead of failing when redirection is
+ requested. Redirection requests will be silently ignored.
+ This can be overriden by saying "fail.on.redirect=true"
+ in the provider properties file.
+
(03/06/2008)
*** Added support for host_types attribute (patch from benc)
Modified: trunk/current/src/cog/modules/provider-gt4_0_0/src/org/globus/cog/abstraction/impl/execution/gt4_0_0/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-gt4_0_0/src/org/globus/cog/abstraction/impl/execution/gt4_0_0/JobSubmissionTaskHandler.java 2008-04-30 02:28:21 UTC (rev 1992)
+++ trunk/current/src/cog/modules/provider-gt4_0_0/src/org/globus/cog/abstraction/impl/execution/gt4_0_0/JobSubmissionTaskHandler.java 2008-04-30 15:26:38 UTC (rev 1993)
@@ -18,8 +18,10 @@
import org.apache.axis.types.NonNegativeInteger;
import org.apache.axis.types.PositiveInteger;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.AbstractionProperties;
import org.globus.cog.abstraction.impl.common.StatusImpl;
import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
+import org.globus.cog.abstraction.impl.common.task.InvalidProviderException;
import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
import org.globus.cog.abstraction.impl.common.task.InvalidServiceContactException;
import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
@@ -317,8 +319,7 @@
boolean batchJob = spec.isBatchJob();
if (FileLocation.MEMORY_AND_LOCAL.overlaps(spec.getStdOutputLocation()
.and(spec.getStdErrorLocation()))) {
- throw new IllegalSpecException(
- "The gt4.0.0 provider does not support redirection");
+ complainAboutRedirection();
}
else {
if (spec.getStdInput() != null) {
@@ -355,6 +356,33 @@
return desc;
}
+ private static boolean redirectionWarning;
+
+ private static void complainAboutRedirection() throws IllegalSpecException,
+ TaskSubmissionException {
+ try {
+ if ("true".equals(AbstractionProperties.getProperties("gt4")
+ .getProperty("fail.on.redirect"))) {
+ throw new IllegalSpecException(
+ "The gt4.0.0 provider does not support redirection");
+ }
+ else {
+ synchronized (JobSubmissionTaskHandler.class) {
+ if (!redirectionWarning) {
+ redirectionWarning = true;
+ logger
+ .warn("The GT4 provider does not support redirection. "
+ + "Redirection requests will be ignored without further warnings.");
+ }
+ }
+ }
+ }
+ catch (InvalidProviderException e) {
+ throw new TaskSubmissionException("Cannot get provider properties",
+ e);
+ }
+ }
+
public void stateChanged(GramJob job) {
boolean cleanup = false;
StateEnumeration state = job.getState();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-30 02:28:27
|
Revision: 1992
http://cogkit.svn.sourceforge.net/cogkit/?rev=1992&view=rev
Author: hategan
Date: 2008-04-29 19:28:21 -0700 (Tue, 29 Apr 2008)
Log Message:
-----------
removed redirection (to make it useable with ws-gram)
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-29 12:37:24 UTC (rev 1991)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-30 02:28:21 UTC (rev 1992)
@@ -34,7 +34,6 @@
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
import org.globus.cog.abstraction.impl.common.task.TaskImpl;
import org.globus.cog.abstraction.interfaces.ExecutionService;
-import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
@@ -178,8 +177,6 @@
js.addArgument(script.getAbsolutePath());
js.addArgument(id);
js.addArgument(callbackURI.toString());
- js.setStdOutputLocation(FileLocation.MEMORY);
- js.setStdErrorLocation(FileLocation.MEMORY);
js.setAttribute("maxwalltime", new WallTime(maxWallTime)
.getSpecInMinutes());
return js;
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-29 12:37:24 UTC (rev 1991)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-30 02:28:21 UTC (rev 1992)
@@ -201,8 +201,6 @@
s.setServiceContact(orig.getService(0).getServiceContact());
s.setJobManager("fork");
t.setService(0, s);
- js.setStdOutputLocation(FileLocation.MEMORY);
- js.setStdErrorLocation(FileLocation.MEMORY);
return t;
}
catch (Exception e) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-28 18:06:13
|
Revision: 1990
http://cogkit.svn.sourceforge.net/cogkit/?rev=1990&view=rev
Author: hategan
Date: 2008-04-28 11:05:13 -0700 (Mon, 28 Apr 2008)
Log Message:
-----------
removed unnecessary cast
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/restartLog/LogEntry.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/restartLog/LogEntry.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/restartLog/LogEntry.java 2008-04-25 13:28:17 UTC (rev 1989)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/restartLog/LogEntry.java 2008-04-28 18:05:13 UTC (rev 1990)
@@ -35,7 +35,7 @@
public static LogEntry build(VariableStack stack, FlowElement fe) throws ExecutionException {
LogEntry entry = new LogEntry();
- entry.key = ThreadingContext.get(stack) + ":" + (Integer) fe.getProperty(FlowElement.UID);
+ entry.key = ThreadingContext.get(stack) + ":" + fe.getProperty(FlowElement.UID);
return entry;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-25 13:28:25
|
Revision: 1989
http://cogkit.svn.sourceforge.net/cogkit/?rev=1989&view=rev
Author: hategan
Date: 2008-04-25 06:28:17 -0700 (Fri, 25 Apr 2008)
Log Message:
-----------
track and account for fallen services
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-25 02:16:23 UTC (rev 1988)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-25 13:28:17 UTC (rev 1989)
@@ -25,6 +25,7 @@
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.coaster.service.ServiceShutdownCommand;
import org.globus.cog.abstraction.coaster.service.local.LocalService;
+import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
import org.globus.cog.abstraction.impl.common.task.TaskImpl;
@@ -33,12 +34,14 @@
import org.globus.cog.abstraction.interfaces.ExecutionService;
import org.globus.cog.abstraction.interfaces.FileLocation;
import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.abstraction.interfaces.TaskHandler;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
import org.ietf.jgss.GSSCredential;
-public class ServiceManager {
+public class ServiceManager implements StatusListener {
public static final Logger logger = Logger
.getLogger(ServiceManager.class);
@@ -115,6 +118,7 @@
try {
startLocalService();
Task t = buildTask(task);
+ t.addStatusListener(this);
if (logger.isDebugEnabled()) {
logger.debug("Starting coaster service on "
+ task.getService(0) + ". Task is " + t);
@@ -124,7 +128,8 @@
.getAttribute(TASK_ATTR_ID));
synchronized (services) {
services.put(service, url);
- credentials.put(url, task.getService(0).getSecurityContext().getCredentials());
+ credentials.put(url, task.getService(0).getSecurityContext()
+ .getCredentials());
}
return url;
}
@@ -136,6 +141,27 @@
}
}
+ public void statusChanged(StatusEvent event) {
+ Task t = (Task) event.getSource();
+ Status s = event.getStatus();
+ if (s.isTerminal()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Service task " + t
+ + " terminated. Removing service.");
+ }
+ synchronized (services) {
+ Object service = getService(t);
+ String url = (String) services.remove(service);
+ if (url == null) {
+ logger.info("Service does not appear to be registered with this manager");
+ }
+ else {
+ credentials.remove(url);
+ }
+ }
+ }
+ }
+
private static final Integer ZERO = new Integer(0);
protected void increaseUsageCount(Object service) {
@@ -255,24 +281,26 @@
}
return String.valueOf(r);
}
-
+
private class ServiceReaper extends Thread {
-
+
public ServiceReaper() {
setName("Coaster service reaper");
}
-
+
public void run() {
Iterator i = services.values().iterator();
while (i.hasNext()) {
String url = (String) i.next();
Object cred = credentials.get(url);
try {
- KarajanChannel channel = CoasterChannelManager.getManager()
- .reserveChannel(url, (GSSCredential) cred);
+ KarajanChannel channel = CoasterChannelManager
+ .getManager().reserveChannel(url,
+ (GSSCredential) cred);
ServiceShutdownCommand ssc = new ServiceShutdownCommand();
ssc.execute(channel);
- CoasterChannelManager.getManager().releaseChannel(channel);
+ CoasterChannelManager.getManager()
+ .releaseChannel(channel);
}
catch (Exception e) {
logger.warn("Failed to shut down service " + url, e);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-25 02:16:25
|
Revision: 1988
http://cogkit.svn.sourceforge.net/cogkit/?rev=1988&view=rev
Author: hategan
Date: 2008-04-24 19:16:23 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
fixed walltime spec; shutdown of queued workers; fixed idle race
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -9,21 +9,26 @@
*/
package org.globus.cog.abstraction.coaster.service;
+import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+public class ServiceShutdownHandler extends RequestHandler {
+ public static final Logger logger = Logger
+ .getLogger(ServiceShutdownHandler.class);
-public class ServiceShutdownHandler extends RequestHandler {
public static final String NAME = "SHUTDOWNSERVICE";
-
- public void requestComplete() throws ProtocolException {
- try {
- CoasterService cs = (CoasterService) getChannel().getChannelContext().getService();
- sendReply("OK");
- cs.shutdown();
- }
- catch (Exception e) {
- throw new ProtocolException("Failed to shut down service", e);
- }
- }
+
+ public void requestComplete() throws ProtocolException {
+ try {
+ CoasterService cs = (CoasterService) getChannel()
+ .getChannelContext().getService();
+ sendReply("OK");
+ cs.shutdown();
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down service", e);
+ throw new ProtocolException("Failed to shut down service", e);
+ }
+ }
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -72,6 +72,10 @@
manager.workerTerminated(this);
}
}
+
+ public Task getWorkerTask() {
+ return task;
+ }
public Task getRunning() {
return running;
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -14,6 +14,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -147,7 +148,9 @@
private void startWorker(int maxWallTime, Task prototype)
throws InvalidServiceContactException {
int id = sr.nextInt();
- System.err.println("Starting worker with id=" + id);
+ if (logger.isInfoEnabled()) {
+ logger.info("Starting worker with id=" + id + " and maxwalltime=" + maxWallTime + "s");
+ }
String sid = String.valueOf(id);
Task t = new TaskImpl();
t.setType(Task.JOB_SUBMISSION);
@@ -177,7 +180,8 @@
js.addArgument(callbackURI.toString());
js.setStdOutputLocation(FileLocation.MEMORY);
js.setStdErrorLocation(FileLocation.MEMORY);
- js.setAttribute("maxwalltime", new WallTime(maxWallTime).getSpecInMinutes());
+ js.setAttribute("maxwalltime", new WallTime(maxWallTime)
+ .getSpecInMinutes());
return js;
}
@@ -188,7 +192,7 @@
ExecutionService p = (ExecutionService) prototype.getService(0);
String jm = p.getJobManager();
int colon = jm.indexOf(':');
- //remove provider used to bootstrap coasters
+ // remove provider used to bootstrap coasters
jm = jm.substring(colon + 1);
colon = jm.indexOf(':');
if (colon == -1) {
@@ -266,8 +270,8 @@
}
synchronized (allocationRequests) {
if (allocationRequests.size() < MAX_STARTING_WORKERS) {
- allocationRequests.add(new AllocationRequest(maxWallTime,
- prototype));
+ allocationRequests.add(new AllocationRequest(
+ maxWallTime, prototype));
allocationRequests.notify();
}
else {
@@ -368,10 +372,24 @@
}
public void shutdown() {
- Iterator i = ready.values().iterator();
- while (i.hasNext()) {
- Worker wr = (Worker) i.next();
- wr.shutdown();
+ synchronized (this) {
+ Iterator i;
+ i = ready.values().iterator();
+ while (i.hasNext()) {
+ Worker wr = (Worker) i.next();
+ wr.shutdown();
+ }
+ i = new ArrayList(requested.values()).iterator();
+ while (i.hasNext()) {
+ Worker wr = (Worker) i.next();
+ try {
+ handler.cancel(wr.getWorkerTask());
+ }
+ catch (Exception e) {
+ logger.warn("Failed to cancel queued worker task "
+ + wr.getWorkerTask(), e);
+ }
+ }
}
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -234,7 +234,7 @@
public static void main(String[] args) {
try {
long s = System.currentTimeMillis();
- Task[] ts = new Task[2];
+ Task[] ts = new Task[512];
for (int i = 0; i < ts.length; i++) {
ts[i] = submitTask();
if (i % 100 == 0) {
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-25 02:10:10 UTC (rev 1987)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-25 02:16:23 UTC (rev 1988)
@@ -69,6 +69,7 @@
else {
task = (Task) tasks.get(id);
}
+ lastNotificationTime = System.currentTimeMillis();
}
if (task != null) {
setStatus(task, s);
@@ -82,7 +83,7 @@
public long getIdleTime() {
synchronized(tasks) {
- if (tasks.size() == 0) {
+ if (tasks.size() == 0 && lastNotificationTime != 0) {
return System.currentTimeMillis() - lastNotificationTime;
}
else {
@@ -98,9 +99,6 @@
}
private void setStatus(Task t, Status s) {
- synchronized(tasks) {
- lastNotificationTime = System.currentTimeMillis();
- }
try {
t.setStatus(s);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-25 02:10:21
|
Revision: 1987
http://cogkit.svn.sourceforge.net/cogkit/?rev=1987&view=rev
Author: hategan
Date: 2008-04-24 19:10:10 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
added job canceling to pbs provider
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-localscheduler/CHANGES.txt
trunk/current/src/cog/modules/provider-localscheduler/etc/provider-pbs.properties
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/PBSExecutor.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/Properties.java
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-localscheduler/CHANGES.txt
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/CHANGES.txt 2008-04-25 00:57:55 UTC (rev 1986)
+++ trunk/current/src/cog/modules/provider-localscheduler/CHANGES.txt 2008-04-25 02:10:10 UTC (rev 1987)
@@ -1,3 +1,7 @@
+(04/24/08)
+
+*** Added job canceling to PBS
+
(09/19/07)
*** Cobalt: 'count' attribute mapped to '-c'. The default should
Modified: trunk/current/src/cog/modules/provider-localscheduler/etc/provider-pbs.properties
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/etc/provider-pbs.properties 2008-04-25 00:57:55 UTC (rev 1986)
+++ trunk/current/src/cog/modules/provider-localscheduler/etc/provider-pbs.properties 2008-04-25 02:10:10 UTC (rev 1987)
@@ -15,7 +15,12 @@
#
qstat=qstat
+#
+# The path to qdel. The default assumes that qdel is in PATH
+#
+qdel=qdel
+
#
# If the jobType attribute is specified, then the PBS provider
# will look for a property named "wrapper.<jobType>" and prepend
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/PBSExecutor.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/PBSExecutor.java 2008-04-25 00:57:55 UTC (rev 1986)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/PBSExecutor.java 2008-04-25 02:10:10 UTC (rev 1987)
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
import org.globus.cog.abstraction.impl.scheduler.common.Job;
import org.globus.cog.abstraction.impl.scheduler.common.ProcessException;
import org.globus.cog.abstraction.impl.scheduler.common.ProcessListener;
@@ -36,10 +37,10 @@
private Task task;
private static QueuePoller poller;
private ProcessListener listener;
- private String stdout, stderr, exitcode;
+ private String stdout, stderr, exitcode, jobid;
private File script;
private static boolean debug;
-
+
static {
debug = "true".equals(Properties.getProperties().getProperty("debug"));
}
@@ -116,7 +117,7 @@
}
}
- String jobid = getOutput(process.getInputStream());
+ jobid = getOutput(process.getInputStream());
process.getInputStream().close();
getProcessPoller().addJob(
@@ -124,6 +125,31 @@
spec.getStdErrorLocation(), exitcode, this));
}
+ public void cancel() throws TaskSubmissionException {
+ if (jobid == null) {
+ throw new TaskSubmissionException("Can only cancel an active task");
+ }
+ String[] cmdline = new String[] { Properties.getProperties().getQDel(),
+ jobid };
+ try {
+ System.out.println("Canceling job " + jobid);
+ Process process = Runtime.getRuntime().exec(cmdline, null, null);
+ int ec = process.waitFor();
+ if (ec != 0) {
+ throw new TaskSubmissionException(
+ "Failed to cancel task. qdel returned with an exit code of "
+ + ec);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new TaskSubmissionException(
+ "Thread interrupted while waiting for qdel to finish");
+ }
+ catch (IOException e) {
+ throw new TaskSubmissionException("Failed to cancel task", e);
+ }
+ }
+
private void error(String message) {
listener.processFailed(message);
}
@@ -197,9 +223,9 @@
wr.write("/bin/echo $? >" + exitcodefile + '\n');
wr.close();
}
-
+
private static final boolean[] TRIGGERS;
-
+
static {
TRIGGERS = new boolean[128];
TRIGGERS[' '] = true;
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/Properties.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/Properties.java 2008-04-25 00:57:55 UTC (rev 1986)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/Properties.java 2008-04-25 02:10:10 UTC (rev 1987)
@@ -21,7 +21,8 @@
public static final String POLL_INTERVAL = "poll.interval";
public static final String QSUB = "qsub";
- public static final String QSTAT = "qstat";
+ public static final String QSTAT = "qstat";
+ public static final String QDEL = "qdel";
private static Properties properties;
@@ -52,6 +53,7 @@
setPollInterval(5);
setQSub("qsub");
setQStat("qstat");
+ setQDel("qdel");
}
public void setPollInterval(int value) {
@@ -77,4 +79,12 @@
public String getQStat() {
return getProperty(QSTAT);
}
+
+ public String getQDel() {
+ return getProperty(QDEL);
+ }
+
+ public void setQDel(String qdel) {
+ setProperty(QDEL, qdel);
+ }
}
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java 2008-04-25 00:57:55 UTC (rev 1986)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/execution/JobSubmissionTaskHandler.java 2008-04-25 02:10:10 UTC (rev 1987)
@@ -31,6 +31,7 @@
private Task task;
private JobSpecification spec;
private Thread thread;
+ private PBSExecutor executor;
public void submit(Task task) throws IllegalSpecException,
InvalidSecurityContextException, InvalidServiceContactException,
@@ -60,7 +61,8 @@
try {
synchronized(this) {
if (this.task.getStatus().getStatusCode() != Status.CANCELED) {
- new PBSExecutor(task, this).start();
+ executor = new PBSExecutor(task, this);
+ executor.start();
this.task.setStatus(Status.SUBMITTED);
if (spec.isBatchJob()) {
this.task.setStatus(Status.COMPLETED);
@@ -90,7 +92,7 @@
public synchronized void cancel() throws InvalidSecurityContextException,
TaskSubmissionException {
- //TODO actually cancel the job
+ executor.cancel();
this.task.setStatus(Status.CANCELED);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-25 00:57:59
|
Revision: 1986
http://cogkit.svn.sourceforge.net/cogkit/?rev=1986&view=rev
Author: hategan
Date: 2008-04-24 17:57:55 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
not sure what's happening, but Torque on TP and TGUC don't seem to like seconds in maxwalltime
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WallTime.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WallTime.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WallTime.java 2008-04-25 00:57:08 UTC (rev 1985)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WallTime.java 2008-04-25 00:57:55 UTC (rev 1986)
@@ -27,6 +27,10 @@
return spec;
}
+ public String getSpecInMinutes() {
+ return spec.substring(spec.length() - 4);
+ }
+
public int getSeconds() {
return seconds;
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-25 00:57:08 UTC (rev 1985)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-25 00:57:55 UTC (rev 1986)
@@ -177,7 +177,7 @@
js.addArgument(callbackURI.toString());
js.setStdOutputLocation(FileLocation.MEMORY);
js.setStdErrorLocation(FileLocation.MEMORY);
- js.setAttribute("maxwalltime", new WallTime(maxWallTime).getSpec());
+ js.setAttribute("maxwalltime", new WallTime(maxWallTime).getSpecInMinutes());
return js;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-25 00:57:20
|
Revision: 1985
http://cogkit.svn.sourceforge.net/cogkit/?rev=1985&view=rev
Author: hategan
Date: 2008-04-24 17:57:08 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
increased registration timeout to 5 minutes (jars seem to download slow from my home connnection)
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java 2008-04-25 00:56:20 UTC (rev 1984)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java 2008-04-25 00:57:08 UTC (rev 1985)
@@ -25,7 +25,7 @@
public class LocalService extends GSSService implements Registering {
public static final Logger logger = Logger.getLogger(LocalService.class);
- public static final long DEFAULT_REGISTRATION_TIMEOUT = 120 * 1000;
+ public static final long DEFAULT_REGISTRATION_TIMEOUT = 300 * 1000;
private Map services;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
Revision: 1984
http://cogkit.svn.sourceforge.net/cogkit/?rev=1984&view=rev
Author: hategan
Date: 2008-04-24 17:56:20 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
allow remote provider without job manager
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-25 00:20:53 UTC (rev 1983)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-25 00:56:20 UTC (rev 1984)
@@ -103,9 +103,9 @@
throw new InvalidServiceContactException("Missing job manager");
}
String[] jmp = jm.split(":");
- if (jmp.length != 2) {
+ if (jmp.length < 2) {
throw new InvalidServiceContactException("Invalid job manager: "
- + jm + ". Use <provider>:<remote-job-manager>.");
+ + jm + ". Use <provider>:<remote-provider>[:<remote-job-manager>].");
}
return jmp[0];
}
@@ -214,12 +214,13 @@
//js.addArgument("0");
t.setSpecification(js);
ExecutionService s = new ExecutionServiceImpl();
- s.setServiceContact(new ServiceContactImpl("localhost"));
+ // s.setServiceContact(new ServiceContactImpl("localhost"));
//s.setServiceContact(new ServiceContactImpl("tp-grid1.ci.uchicago.edu"));
+ s.setServiceContact(new ServiceContactImpl("tg-grid1.uc.teragrid.org"));
// s.setServiceContact(new ServiceContactImpl("localhost:50013"));
s.setProvider("coaster");
- s.setJobManager("local:local");
- //s.setJobManager("gt2:pbs");
+ //s.setJobManager("local:local");
+ s.setJobManager("gt2:pbs");
s.setSecurityContext(new SecurityContextImpl());
t.setService(0, s);
// JobSubmissionTaskHandler th = new JobSubmissionTaskHandler(
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-25 00:20:56
|
Revision: 1983
http://cogkit.svn.sourceforge.net/cogkit/?rev=1983&view=rev
Author: hategan
Date: 2008-04-24 17:20:53 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
not all versions of mktemp support -t
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh
Modified: trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh 2008-04-24 21:01:35 UTC (rev 1982)
+++ trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh 2008-04-25 00:20:53 UTC (rev 1983)
@@ -14,7 +14,7 @@
if [ "$L" == "" ]; then
L=~/coaster-boot-$ID.log
fi
-DJ=`mktemp -t bootstrap.XXXXXX`
+DJ=`mktemp /tmp/bootstrap.XXXXXX`
echo "BS: $BS" >>$L
wget -c -q $BS/coaster-bootstrap.jar -O $DJ >>$L 2>&1
if [ "$?" != "0" ]; then
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-24 21:01:49
|
Revision: 1982
http://cogkit.svn.sourceforge.net/cogkit/?rev=1982&view=rev
Author: hategan
Date: 2008-04-24 14:01:35 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
only fail after 3 consecutive qstat failures
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java
Modified: trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java
===================================================================
--- trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java 2008-04-24 19:49:13 UTC (rev 1981)
+++ trunk/current/src/cog/modules/provider-localscheduler/src/org/globus/cog/abstraction/impl/scheduler/pbs/QueuePoller.java 2008-04-24 21:01:35 UTC (rev 1982)
@@ -25,12 +25,15 @@
public class QueuePoller extends Thread {
public static final Logger logger = Logger.getLogger(QueuePoller.class);
+
+ public static final int MAX_CONSECUTIVE_FAILURES = 3;
private LinkedList newjobs, donejobs;
private Set processed;
private Map jobs;
boolean any = false;
private int sleepTime;
+ private int failures;
public QueuePoller() {
setName("PBS-Local provider stream poller");
@@ -140,8 +143,14 @@
processStderr(pqstat.getErrorStream());
int ec = pqstat.waitFor();
if (ec != 0) {
- failAll("QStat failed (exit code " + ec + ")");
+ failures++;
+ if (failures >= MAX_CONSECUTIVE_FAILURES) {
+ failAll("QStat failed (exit code " + ec + ")");
+ }
}
+ else {
+ failures = 0;
+ }
if (logger.isDebugEnabled()) {
logger.debug("QStat done");
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-24 19:49:48
|
Revision: 1981
http://cogkit.svn.sourceforge.net/cogkit/?rev=1981&view=rev
Author: hategan
Date: 2008-04-24 12:49:13 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
bootstrap script fixes
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh
Modified: trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh 2008-04-24 19:48:05 UTC (rev 1980)
+++ trunk/current/src/cog/modules/provider-coaster/resources/bootstrap.sh 2008-04-24 19:49:13 UTC (rev 1981)
@@ -14,13 +14,17 @@
if [ "$L" == "" ]; then
L=~/coaster-boot-$ID.log
fi
-DJ=`mktemp bootstrap.XXXXXX`
+DJ=`mktemp -t bootstrap.XXXXXX`
echo "BS: $BS" >>$L
wget -c -q $BS/coaster-bootstrap.jar -O $DJ >>$L 2>&1
if [ "$?" != "0" ]; then
error "Failed to download bootstrap jar from $BS"
fi
-AMD5=`/usr/bin/md5sum $DJ`
+MD5SUM=`which gmd5sum`
+if [ "X$MD5SUM" == "X" ]; then
+ MD5SUM=`which md5sum`
+fi
+AMD5=`$MD5SUM $DJ`
echo "Expected checksum: $EMD5" >>$L
echo "Computed checksum: ${AMD5:0:32}" >>$L
if [ "${AMD5:0:32}" != "$EMD5" ]; then
@@ -36,7 +40,7 @@
echo "JAVA=$JAVA" >>$L
if [ -x $JAVA ]; then
echo "$JAVA -Djava.home="$JAVA_HOME" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID" >>$L
- $JAVA -Djava.home="$JAVA_HOME" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID >>$L 2>&1
+ $JAVA -Djava.home="$JAVA_HOME" -DGLOBUS_TCP_PORT_RANGE="$GLOBUS_TCP_PORT_RANGE" -DX509_USER_PROXY="$X509_USER_PROXY" -DGLOBUS_HOSTNAME="$H" -jar $DJ $BS $LMD5 $LS $ID >>$L 2>&1
EC=$?
echo "Exit code: $EC" >>$L
rm -f $DJ
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-24 19:48:34
|
Revision: 1980
http://cogkit.svn.sourceforge.net/cogkit/?rev=1980&view=rev
Author: hategan
Date: 2008-04-24 12:48:05 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
ability to limit number of simultaneous worker startups
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-24 19:47:19 UTC (rev 1979)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-24 19:48:05 UTC (rev 1980)
@@ -57,7 +57,8 @@
public static final int OVERALLOCATION_FACTOR = 10;
- public static final int MAX_WORKERS = 100;
+ public static final int MAX_WORKERS = 256;
+ public static final int MAX_STARTING_WORKERS = 32;
private SortedMap ready;
private Map ids;
@@ -264,9 +265,17 @@
.info("No suitable worker found. Attempting to start a new one.");
}
synchronized (allocationRequests) {
- allocationRequests.add(new AllocationRequest(maxWallTime,
- prototype));
- allocationRequests.notify();
+ if (allocationRequests.size() < MAX_STARTING_WORKERS) {
+ allocationRequests.add(new AllocationRequest(maxWallTime,
+ prototype));
+ allocationRequests.notify();
+ }
+ else {
+ synchronized (this) {
+ this.wait(250);
+ }
+ return null;
+ }
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-24 19:47:43
|
Revision: 1979
http://cogkit.svn.sourceforge.net/cogkit/?rev=1979&view=rev
Author: hategan
Date: 2008-04-24 12:47:19 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
use GLOBUS_TCP_PORT_RANGE if available
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-04-24 16:14:29 UTC (rev 1978)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-04-24 19:47:19 UTC (rev 1979)
@@ -212,6 +212,7 @@
private void addProperties(List args) {
addProperty(args, "X509_USER_PROXY");
addProperty(args, "GLOBUS_HOSTNAME");
+ addProperty(args, "GLOBUS_TCP_PORT_RANGE");
}
private void addProperty(List args, String name) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-24 16:14:49
|
Revision: 1978
http://cogkit.svn.sourceforge.net/cogkit/?rev=1978&view=rev
Author: hategan
Date: 2008-04-24 09:14:29 -0700 (Thu, 24 Apr 2008)
Log Message:
-----------
more of automatic shutdown
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterRequestManager.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -21,5 +21,6 @@
addHandler("CHANNELCONFIG", ChannelConfigurationHandler.class);
addHandler("SHUTDOWN", ShutdownHandler.class);
addHandler(SubmitJobCommand.NAME, SubmitJobHandler.class);
+ addHandler(ServiceShutdownHandler.NAME, ServiceShutdownHandler.class);
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -159,7 +159,7 @@
public void shutdown() {
super.shutdown();
- //jobQueue.getWorkerManager().shutdown();
+ jobQueue.getWorkerManager().shutdown();
done = true;
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -19,8 +19,8 @@
public void requestComplete() throws ProtocolException {
try {
CoasterService cs = (CoasterService) getChannel().getChannelContext().getService();
+ sendReply("OK");
cs.shutdown();
- sendReply("OK");
}
catch (Exception e) {
throw new ProtocolException("Failed to shut down service", e);
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -9,14 +9,20 @@
*/
package org.globus.cog.abstraction.coaster.service.job.manager;
+import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.impl.common.StatusImpl;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.globus.cog.karajan.workflow.service.commands.ShutdownCommand;
public class Worker implements StatusListener {
+ public static final Logger logger = Logger.getLogger(Worker.class);
+
private Task task, running;
private String id;
private WorkerManager manager;
@@ -25,7 +31,7 @@
private int maxWallTime;
private Status error;
private ChannelContext channelContext;
-
+
private static final Long NEVER = new Long(Long.MAX_VALUE);
public Worker(WorkerManager manager, String id, int maxWallTime, Task w,
@@ -101,11 +107,11 @@
public void setScheduledTerminationTime(long l) {
this.scheduledTerminationTime = new Long(l);
}
-
+
public Status getStatus() {
- return error;
+ return error;
}
-
+
public String toString() {
return "Worker[" + id + "]";
}
@@ -113,8 +119,20 @@
public void setChannelContext(ChannelContext cc) {
this.channelContext = cc;
}
-
+
public ChannelContext getChannelContext() {
return this.channelContext;
}
+
+ public void shutdown() {
+ try {
+ KarajanChannel channel = ChannelManager.getManager()
+ .reserveChannel(channelContext);
+ ShutdownCommand sc = new ShutdownCommand();
+ sc.execute(channel);
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down worker", e);
+ }
+ }
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -359,5 +359,10 @@
}
public void shutdown() {
+ Iterator i = ready.values().iterator();
+ while (i.hasNext()) {
+ Worker wr = (Worker) i.next();
+ wr.shutdown();
+ }
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-23 17:21:50 UTC (rev 1977)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-24 16:14:29 UTC (rev 1978)
@@ -272,6 +272,7 @@
.reserveChannel(url, (GSSCredential) cred);
ServiceShutdownCommand ssc = new ServiceShutdownCommand();
ssc.execute(channel);
+ CoasterChannelManager.getManager().releaseChannel(channel);
}
catch (Exception e) {
logger.warn("Failed to shut down service " + url, e);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-23 17:23:38
|
Revision: 1977
http://cogkit.svn.sourceforge.net/cogkit/?rev=1977&view=rev
Author: hategan
Date: 2008-04-23 10:21:50 -0700 (Wed, 23 Apr 2008)
Log Message:
-----------
added some automatic service shutdown
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
Added Paths:
-----------
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java
trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -16,6 +16,7 @@
import org.globus.cog.abstraction.coaster.service.job.manager.JobQueue;
import org.globus.cog.abstraction.coaster.service.local.JobStatusHandler;
import org.globus.cog.abstraction.coaster.service.local.RegistrationHandler;
+import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager;
import org.globus.cog.karajan.workflow.service.ConnectionHandler;
import org.globus.cog.karajan.workflow.service.GSSService;
import org.globus.cog.karajan.workflow.service.RequestManager;
@@ -28,11 +29,14 @@
public static final Logger logger = Logger
.getLogger(CoasterService.class);
+ public static final int IDLE_TIMEOUT = 600 * 1000;
+
private String registrationURL, id;
private JobQueue jobQueue;
private LocalTCPService localService;
private Exception e;
private boolean done;
+ private boolean suspended;
public CoasterService() throws IOException {
this(null, null);
@@ -52,13 +56,23 @@
protected void handleConnection(Socket sock) {
logger.debug("Got connection");
- try {
- ConnectionHandler handler = new ConnectionHandler(this, sock,
- new CoasterRequestManager());
- handler.start();
+ if (isSuspended()) {
+ try {
+ sock.close();
+ }
+ catch (IOException e) {
+ logger.warn("Failed to close new connection", e);
+ }
}
- catch (Exception e) {
- logger.warn("Could not start connection handler", e);
+ else {
+ try {
+ ConnectionHandler handler = new ConnectionHandler(this, sock,
+ new CoasterRequestManager());
+ handler.start();
+ }
+ catch (Exception e) {
+ logger.warn("Could not start connection handler", e);
+ }
}
}
@@ -69,7 +83,9 @@
jobQueue = new JobQueue(localService);
jobQueue.start();
localService.setWorkerManager(jobQueue.getWorkerManager());
- logger.info("Started local service: " + localService.getContact());
+ logger
+ .info("Started local service: "
+ + localService.getContact());
if (id != null) {
try {
logger.info("Reserving channel for registration");
@@ -93,9 +109,9 @@
stop(e);
}
}
-
+
private void stop(Exception e) {
- synchronized(this) {
+ synchronized (this) {
this.e = e;
done = true;
notifyAll();
@@ -103,9 +119,10 @@
}
public void waitFor() throws Exception {
- synchronized(this) {
+ synchronized (this) {
while (!done) {
- wait();
+ wait(1000);
+ checkIdleTime();
}
if (e != null) {
throw e;
@@ -113,6 +130,39 @@
}
}
+ private synchronized void checkIdleTime() {
+ // the notification manager should probably not be a singleton
+ long idleTime = NotificationManager.getDefault().getIdleTime();
+ if (idleTime > IDLE_TIMEOUT) {
+ suspend();
+ if (NotificationManager.getDefault().getIdleTime() < IDLE_TIMEOUT) {
+ resume();
+ }
+ else {
+ logger.info("Idle time exceeded. Shutting down service.");
+ shutdown();
+ }
+ }
+ }
+
+ public synchronized void suspend() {
+ this.suspended = true;
+ }
+
+ public synchronized boolean isSuspended() {
+ return suspended;
+ }
+
+ public synchronized void resume() {
+ this.suspended = false;
+ }
+
+ public void shutdown() {
+ super.shutdown();
+ //jobQueue.getWorkerManager().shutdown();
+ done = true;
+ }
+
public JobQueue getJobQueue() {
return jobQueue;
}
@@ -127,7 +177,6 @@
s = new CoasterService(args[0], args[1]);
}
s.start();
- //JobSubmissionTaskHandler.main(new String[0]);
s.waitFor();
System.exit(0);
}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownCommand.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -0,0 +1,19 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.karajan.workflow.service.commands.Command;
+
+
+public class ServiceShutdownCommand extends Command {
+ public ServiceShutdownCommand() {
+ super(ServiceShutdownHandler.NAME);
+ }
+}
Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java (rev 0)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/ServiceShutdownHandler.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -0,0 +1,29 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 21, 2005
+ */
+package org.globus.cog.abstraction.coaster.service;
+
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
+
+
+public class ServiceShutdownHandler extends RequestHandler {
+ public static final String NAME = "SHUTDOWNSERVICE";
+
+ public void requestComplete() throws ProtocolException {
+ try {
+ CoasterService cs = (CoasterService) getChannel().getChannelContext().getService();
+ cs.shutdown();
+ sendReply("OK");
+ }
+ catch (Exception e) {
+ throw new ProtocolException("Failed to shut down service", e);
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -357,4 +357,7 @@
this.prototype = prototype;
}
}
+
+ public void shutdown() {
+ }
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -210,8 +210,8 @@
Task t = new TaskImpl();
t.setType(Task.JOB_SUBMISSION);
JobSpecification js = new JobSpecificationImpl();
- js.setExecutable("/bin/sleep");
- js.addArgument("0");
+ js.setExecutable("/bin/echo");
+ //js.addArgument("0");
t.setSpecification(js);
ExecutionService s = new ExecutionServiceImpl();
s.setServiceContact(new ServiceContactImpl("localhost"));
@@ -256,9 +256,11 @@
System.err.println("All " + ts.length + " jobs done");
System.err.println("Total time: "
+ (System.currentTimeMillis() - s));
+ System.exit(0);
}
catch (Exception e) {
e.printStackTrace();
+ System.exit(1);
}
}
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/NotificationManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -18,6 +18,12 @@
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
+/**
+ * This class is used to keep track of tasks sent
+ * to workers, since the worker is only aware of the
+ * task ID. A notification from a worker needs to
+ * be coupled with a Task object based on the ID.
+ */
public class NotificationManager {
public static final Logger logger = Logger
.getLogger(NotificationManager.class);
@@ -33,6 +39,7 @@
private Map tasks;
private Map pending;
+ private long lastNotificationTime;
public NotificationManager() {
tasks = new HashMap();
@@ -52,7 +59,7 @@
}
}
}
-
+
public void notificationReceived(String id, Status s) {
Task task;
synchronized (tasks) {
@@ -72,8 +79,28 @@
}
}
}
+
+ public long getIdleTime() {
+ synchronized(tasks) {
+ if (tasks.size() == 0) {
+ return System.currentTimeMillis() - lastNotificationTime;
+ }
+ else {
+ return 0;
+ }
+ }
+ }
+
+ public int getActiveTaskCount() {
+ synchronized(tasks) {
+ return tasks.size();
+ }
+ }
private void setStatus(Task t, Status s) {
+ synchronized(tasks) {
+ lastNotificationTime = System.currentTimeMillis();
+ }
try {
t.setStatus(s);
}
@@ -90,5 +117,4 @@
}
p.addLast(status);
}
-
}
Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-23 13:37:34 UTC (rev 1976)
+++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-04-23 17:21:50 UTC (rev 1977)
@@ -18,10 +18,12 @@
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.coaster.service.ServiceShutdownCommand;
import org.globus.cog.abstraction.coaster.service.local.LocalService;
import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
@@ -33,6 +35,8 @@
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.ietf.jgss.GSSCredential;
public class ServiceManager {
public static final Logger logger = Logger
@@ -43,7 +47,6 @@
public static final String BOOTSTRAP_LIST = "coaster-bootstrap.list";
public static final String TASK_ATTR_ID = "coaster:serviceid";
- // public static final String BOOTSTRAP_SCRIPT = "test.sh";
private static ServiceManager defaultManager;
@@ -58,13 +61,18 @@
private BootstrapService bootstrapService;
private LocalService localService;
private Map services;
+ private Map credentials;
private Set starting;
private Map usageCount;
+ private ServiceReaper serviceReaper;
public ServiceManager() {
services = new HashMap();
+ credentials = new HashMap();
starting = new HashSet();
usageCount = new HashMap();
+ serviceReaper = new ServiceReaper();
+ Runtime.getRuntime().addShutdownHook(serviceReaper);
}
public String reserveService(Task task, TaskHandler bootHandler)
@@ -116,6 +124,7 @@
.getAttribute(TASK_ATTR_ID));
synchronized (services) {
services.put(service, url);
+ credentials.put(url, task.getService(0).getSecurityContext().getCredentials());
}
return url;
}
@@ -246,4 +255,29 @@
}
return String.valueOf(r);
}
+
+ private class ServiceReaper extends Thread {
+
+ public ServiceReaper() {
+ setName("Coaster service reaper");
+ }
+
+ public void run() {
+ Iterator i = services.values().iterator();
+ while (i.hasNext()) {
+ String url = (String) i.next();
+ Object cred = credentials.get(url);
+ try {
+ KarajanChannel channel = CoasterChannelManager.getManager()
+ .reserveChannel(url, (GSSCredential) cred);
+ ServiceShutdownCommand ssc = new ServiceShutdownCommand();
+ ssc.execute(channel);
+ }
+ catch (Exception e) {
+ logger.warn("Failed to shut down service " + url, e);
+ }
+ }
+ }
+
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-23 13:37:39
|
Revision: 1976
http://cogkit.svn.sourceforge.net/cogkit/?rev=1976&view=rev
Author: hategan
Date: 2008-04-23 06:37:34 -0700 (Wed, 23 Apr 2008)
Log Message:
-----------
don't ignore job directory
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-04-22 23:05:43 UTC (rev 1975)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-04-23 13:37:34 UTC (rev 1976)
@@ -397,7 +397,11 @@
close STDERR;
open STDERR, $stderr;
}
- exec { $executable } @JOBARGS or queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "513", "Could not execute $executable: $!");
+ if (defined $JOB{directory}) {
+ chdir $JOB{directory};
+ }
+ wlog "Command: @JOBARGS\n";
+ exec { $executable } @JOBARGS or queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "513", "Could not execute $executable: $!");
die "Could not execute $executable: $!";
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-04-22 23:05:52
|
Revision: 1975
http://cogkit.svn.sourceforge.net/cogkit/?rev=1975&view=rev
Author: hategan
Date: 2008-04-22 16:05:43 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
waitpid returns more than just the exit code
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-04-22 22:12:02 UTC (rev 1974)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-04-22 23:05:43 UTC (rev 1975)
@@ -368,7 +368,7 @@
else {
wlog "Forked process $pid. Waiting for its completion\n";
waitpid($pid, 0);
- $status = $?;
+ $status = $? & 0xff;
wlog "Child process $pid terminated. Status is $status. $!\n";
queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", "");
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|