|
From: <ha...@us...> - 2008-07-30 00:19:14
|
Revision: 2119
http://cogkit.svn.sourceforge.net/cogkit/?rev=2119&view=rev
Author: hategan
Date: 2008-07-30 00:19:11 +0000 (Wed, 30 Jul 2008)
Log Message:
-----------
attempt 3 at fixing the scheduler
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/OverloadedHostMonitor.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/OverloadedHostMonitor.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/OverloadedHostMonitor.java 2008-07-29 21:30:31 UTC (rev 2118)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/OverloadedHostMonitor.java 2008-07-30 00:19:11 UTC (rev 2119)
@@ -9,9 +9,9 @@
*/
package org.globus.cog.karajan.scheduler;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
-import java.util.Map;
+import java.util.Set;
import org.apache.log4j.Logger;
@@ -19,7 +19,7 @@
public static final Logger logger = Logger.getLogger(OverloadedHostMonitor.class);
public static final int POLL_INTERVAL = 1000;
- private Map hosts;
+ private Set hosts;
private WeightedHostScoreScheduler whss;
private static final Integer[] DIRS = new Integer[] { new Integer(-1), new Integer(0),
@@ -28,15 +28,15 @@
public OverloadedHostMonitor(WeightedHostScoreScheduler whss) {
super("Overloaded Host Monitor");
setDaemon(true);
- hosts = new HashMap();
+ hosts = new HashSet();
this.whss = whss;
start();
}
- public void add(WeightedHost wh, int dir) {
+ public void add(WeightedHost wh) {
synchronized (hosts) {
- if (!hosts.containsKey(wh)) {
- hosts.put(wh, DIRS[dir + 1]);
+ if (!hosts.contains(wh)) {
+ hosts.add(wh);
}
}
}
@@ -47,13 +47,12 @@
Thread.sleep(POLL_INTERVAL);
try {
synchronized (hosts) {
- Iterator i = hosts.entrySet().iterator();
+ Iterator i = hosts.iterator();
while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- WeightedHost wh = (WeightedHost) e.getKey();
+ WeightedHost wh = (WeightedHost) i.next();
if (wh.isOverloaded() == 0) {
- Integer dir = (Integer) e.getValue();
- whss.updateOverloadedCount(dir.intValue());
+ System.err.println("Removing " + wh.getHost());
+ whss.removeOverloaded(wh);
i.remove();
}
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-07-29 21:30:31 UTC (rev 2118)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-07-30 00:19:11 UTC (rev 2119)
@@ -192,6 +192,7 @@
double sum = s.getSum();
if (policy == POLICY_WEIGHTED_RANDOM) {
double rand = Math.random() * sum;
+ System.err.println("Sorted: " + s);
if (logger.isInfoEnabled() && !s.isEmpty()) {
logger.info("Sorted: " + s);
}
@@ -478,8 +479,8 @@
}
}
- protected void updateOverloadedCount(int v) {
- sorted.updateOverloadedCount(v);
+ protected void removeOverloaded(WeightedHost wh) {
+ sorted.removeOverloaded(wh);
raiseTasksFinished();
}
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2008-07-29 21:30:31 UTC (rev 2118)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2008-07-30 00:19:11 UTC (rev 2119)
@@ -10,8 +10,10 @@
package org.globus.cog.karajan.scheduler;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import org.globus.cog.karajan.util.BoundContact;
@@ -21,7 +23,7 @@
private Map weightedHosts;
private double sum;
private double scoreHighCap;
- private volatile int overloadedCount;
+ private Set overloaded;
private OverloadedHostMonitor monitor;
public WeightedHostSet(double scoreHighCap) {
@@ -37,6 +39,7 @@
protected void init() {
scores = new TreeSet();
weightedHosts = new HashMap();
+ overloaded = new HashSet();
sum = 0;
}
@@ -44,58 +47,49 @@
scores.add(wh);
weightedHosts.put(wh.getHost(), wh);
sum += wh.getTScore();
- checkOverloaded(wh, 1);
+ checkOverloaded(wh);
}
public void changeScore(WeightedHost wh, double newScore) {
scores.remove(wh);
sum -= wh.getTScore();
- checkOverloaded(wh, -1);
wh.setScore(newScore);
weightedHosts.put(wh.getHost(), wh);
scores.add(wh);
sum += wh.getTScore();
- checkOverloaded(wh, 1);
+ checkOverloaded(wh);
}
public void changeLoad(WeightedHost wh, int dl) {
- checkOverloaded(wh, -1);
wh.changeLoad(dl);
- checkOverloaded(wh, 1);
+ checkOverloaded(wh);
}
public double remove(WeightedHost wh) {
scores.remove(wh);
weightedHosts.remove(wh.getHost());
sum -= wh.getScore();
- checkOverloaded(wh, -1);
+ removeOverloaded(wh);
return wh.getScore();
}
- private void checkOverloaded(WeightedHost wh, final int dir) {
+ private void checkOverloaded(WeightedHost wh) {
int v = wh.isOverloaded();
- int countDelta;
- if (v == 0) {
- // not overloaded
- countDelta = 0;
- }
- else if (v > 0) {
+ if (v > 0) {
// overloaded either too many tasks (v == 1) or delay already expired (v > 0)
// there's a bit of ambiguity there, but it does not make a difference
- if (dir > 0) {
- countDelta = v;
- }
- else {
- countDelta = -v;
- }
+ addOverloaded(wh);
}
- else {
+ else if (v < 0) {
+ System.err.println(wh.getHost() + " : " + v);
if (monitor != null) {
- monitor.add(wh, -dir);
+ monitor.add(wh);
}
- countDelta = dir;
+ addOverloaded(wh);
}
- updateOverloadedCount(countDelta);
+ else {
+ removeOverloaded(wh);
+ }
}
public WeightedHost findHost(BoundContact bc) {
@@ -144,12 +138,20 @@
}
public boolean allOverloaded() {
- return overloadedCount == weightedHosts.size();
+ synchronized(overloaded) {
+ return overloaded.size() == weightedHosts.size();
+ }
}
- protected void updateOverloadedCount(int dif) {
- if (dif != 0) {
- this.overloadedCount += dif;
+ protected void addOverloaded(WeightedHost wh) {
+ synchronized(overloaded) {
+ overloaded.add(wh);
}
}
+
+ protected void removeOverloaded(WeightedHost wh) {
+ synchronized(overloaded) {
+ overloaded.remove(wh);
+ }
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|