|
From: <ha...@us...> - 2006-10-17 22:08:30
|
Revision: 987
http://svn.sourceforge.net/cogkit/?rev=987&view=rev
Author: hategan
Date: 2006-10-17 15:08:22 -0700 (Tue, 17 Oct 2006)
Log Message:
-----------
shared schedulers
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/CHANGES.txt
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/ExecutionContext.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/GridExec.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/SchedulerNode.java
Modified: trunk/current/src/cog/modules/karajan/CHANGES.txt
===================================================================
--- trunk/current/src/cog/modules/karajan/CHANGES.txt 2006-10-17 21:16:04 UTC (rev 986)
+++ trunk/current/src/cog/modules/karajan/CHANGES.txt 2006-10-17 22:08:22 UTC (rev 987)
@@ -1,3 +1,10 @@
+(10/17/2006)
+
+*** Added the possibility to share schedulers (if in the same JVM
+ instance) between executions.
+
+*** Made sure the event bus is initialized in all cases
+
(10/16/2006)
*** Make task:execute(arguments=...) work with vargs
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/ExecutionContext.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/ExecutionContext.java 2006-10-17 21:16:04 UTC (rev 986)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/ExecutionContext.java 2006-10-17 22:08:22 UTC (rev 987)
@@ -112,11 +112,11 @@
}
public void start() {
- EventBus.initialize();
start(new LinkedStack(this));
}
public void start(VariableStack stack) {
+ EventBus.initialize();
startTime = System.currentTimeMillis();
if (arguments == null) {
arguments = Collections.EMPTY_LIST;
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/GridExec.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/GridExec.java 2006-10-17 21:16:04 UTC (rev 986)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/GridExec.java 2006-10-17 22:08:22 UTC (rev 987)
@@ -78,7 +78,7 @@
String provider = null;
while (i.hasNext()) {
String name = (String) i.next();
-
+
Object value = named.getArgument(name);
if (name.equals(A_EXECUTABLE.getName())) {
js.setExecutable(TypeUtil.toString(value));
@@ -154,7 +154,7 @@
js.setAttribute(name, value);
}
}
-
+
VariableArguments env = C_ENVIRONMENT.get(stack);
Iterator j = env.iterator();
while (j.hasNext()) {
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/SchedulerNode.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/SchedulerNode.java 2006-10-17 21:16:04 UTC (rev 986)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/nodes/grid/SchedulerNode.java 2006-10-17 22:08:22 UTC (rev 987)
@@ -6,7 +6,7 @@
package org.globus.cog.karajan.workflow.nodes.grid;
-import java.util.Hashtable;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -26,6 +26,7 @@
public static final Logger logger = Logger.getLogger(SchedulerNode.class);
public static final Arg A_TYPE = new Arg.Positional("type", 0);
+ public static final Arg A_SHARE_ID = new Arg.Optional("shareID");
public static final Arg A_RESOURCES = new Arg.Positional("resources", 1);
public static final Arg.Channel A_PROPERTIES = new Arg.Channel("properties");
public static final Arg.Channel A_TASK_TRANSFORMERS = new Arg.Channel("taskTransformers");
@@ -36,12 +37,14 @@
static {
setArguments(SchedulerNode.class, new Arg[] { A_TYPE, A_RESOURCES, A_PROPERTIES,
A_TASK_TRANSFORMERS, A_HANDLERS });
+ sharedInstances = new HashMap();
}
private static Map schedulers;
+ private static Map sharedInstances;
private void initializeSchedulers(KarajanProperties properties) {
- schedulers = new Hashtable();
+ schedulers = new HashMap();
Iterator i = properties.keySet().iterator();
while (i.hasNext()) {
String name = (String) i.next();
@@ -53,6 +56,25 @@
public void post(VariableStack stack) throws ExecutionException {
Scheduler s = null;
+ String shareID = TypeUtil.toString(A_SHARE_ID.getValue(stack, null));
+ if (shareID != null) {
+ synchronized(sharedInstances) {
+ s = (Scheduler) sharedInstances.get(shareID);
+ if (s == null) {
+ s = newScheduler(stack);
+ sharedInstances.put(shareID, s);
+ }
+ }
+ }
+ else {
+ s = newScheduler(stack);
+ }
+ stack.parentFrame().setVar(SCHEDULER, s);
+ super.post(stack);
+ }
+
+ protected Scheduler newScheduler(VariableStack stack) throws ExecutionException {
+ Scheduler s;
String type = TypeUtil.toString(A_TYPE.getValue(stack));
synchronized (SchedulerNode.class) {
if (schedulers == null) {
@@ -106,7 +128,6 @@
s.addTaskHandler((TaskHandlerWrapper) i.next());
}
- stack.parentFrame().setVar(SCHEDULER, s);
- super.post(stack);
+ return s;
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|