|
From: <ha...@us...> - 2006-11-06 19:52:41
|
Revision: 1340
http://svn.sourceforge.net/cogkit/?rev=1340&view=rev
Author: hategan
Date: 2006-11-06 11:52:31 -0800 (Mon, 06 Nov 2006)
Log Message:
-----------
added resource constraints
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -21,6 +21,7 @@
import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.Contact;
import org.globus.cog.karajan.util.ContactSet;
import org.globus.cog.karajan.util.Queue;
@@ -48,6 +49,8 @@
private final Map constraints;
private List taskTransformers, failureHandlers;
+
+ private ResourceConstraintChecker constraintChecker;
public AbstractScheduler() {
super("Scheduler");
@@ -240,4 +243,30 @@
public void addFailureHandler(FailureHandler handler) {
failureHandlers.add(handler);
}
+
+ public ResourceConstraintChecker getConstraintChecker() {
+ return constraintChecker;
+ }
+
+ public void setConstraintChecker(ResourceConstraintChecker constraintChecker) {
+ this.constraintChecker = constraintChecker;
+ }
+
+ protected boolean checkConstraints(BoundContact resource, TaskConstraints tc) {
+ if (constraintChecker == null) {
+ return true;
+ }
+ else {
+ return constraintChecker.checkConstraints(resource, tc);
+ }
+ }
+
+ protected List checkConstraints(List resources, TaskConstraints tc) {
+ if (constraintChecker == null) {
+ return resources;
+ }
+ else {
+ return constraintChecker.checkConstraints(resources, tc);
+ }
+ }
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -0,0 +1,39 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Nov 3, 2006
+ */
+package org.globus.cog.karajan.scheduler;
+
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.BoundContact;
+
+public class ContactAllocationTask extends TaskImpl {
+ private BoundContact contact;
+ private VariableStack stack;
+
+ public BoundContact getContact() {
+ return contact;
+ }
+
+ public void setContact(BoundContact contact) {
+ this.contact = contact;
+ }
+
+ public VariableStack getStack() {
+ return stack;
+ }
+
+ public void setStack(VariableStack stack) {
+ this.stack = stack;
+ }
+
+ public int getRequiredServices() {
+ return 1;
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -13,6 +13,7 @@
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.Contact;
+import org.globus.cog.karajan.util.ContactSet;
import org.globus.cog.karajan.util.TypeUtil;
public class DefaultScheduler extends LateBindingScheduler implements Scheduler, Runnable {
@@ -63,7 +64,9 @@
throws NoFreeResourceException {
checkGlobalLoadConditions();
int initial = contactCursor;
- while (!checkLoad(getResources().get(contactCursor))) {
+ ContactSet resources = getResources();
+ while (!checkLoad(resources.get(contactCursor))
+ || !checkConstraints(resources.get(contactCursor), t)) {
incContactCursor();
if (contactCursor == initial) {
logger.debug("No free resources");
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -423,6 +423,15 @@
public void submitBoundToServices(Task t, Contact[] contacts, Service[] services)
throws TaskSubmissionException {
+ if (t instanceof ContactAllocationTask) {
+ ((ContactAllocationTask) t).setContact((BoundContact) contacts[0]);
+ Status status = t.getStatus();
+ status.setPrevStatusCode(status.getStatusCode());
+ status.setStatusCode(Status.COMPLETED);
+ StatusEvent se = new StatusEvent(t, status);
+ fireJobStatusChangeEvent(se);
+ return;
+ }
applyTaskTransformers(t, contacts, services);
t.addStatusListener(this);
TaskHandler handler;
@@ -607,5 +616,4 @@
protected Contact[] getContacts(Task t) {
return (Contact[]) taskContacts.get(t);
}
-
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -0,0 +1,21 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 18, 2006
+ */
+package org.globus.cog.karajan.scheduler;
+
+import java.util.List;
+
+import org.globus.cog.karajan.util.BoundContact;
+
+//I'm terrible with naming classes
+public interface ResourceConstraintChecker {
+ boolean checkConstraints(BoundContact resource, TaskConstraints tc);
+
+ List checkConstraints(List resources, TaskConstraints tc);
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -11,7 +11,7 @@
import org.globus.cog.karajan.util.BoundContact;
-public class WeightedHost {
+public class WeightedHost implements Comparable {
private BoundContact host;
private Double score;
@@ -51,4 +51,16 @@
public String toString() {
return host.toString();
}
+
+ public int compareTo(Object o) {
+ WeightedHost other = (WeightedHost) o;
+ int r = score.compareTo(other.score);
+ if (r == 0) {
+ //arbitrary ordering on the contact
+ return System.identityHashCode(host) - System.identityHashCode(other.host);
+ }
+ else {
+ return r;
+ }
+ }
}
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -38,11 +38,10 @@
public static final String FACTOR_FAILURE = "failureFactor";
public static final String SCORE_HIGH_CAP = "scoreHighCap";
public static final String SCORE_LOW_CAP = "scoreLowCap";
- public static final String RENORMALIZATION_DELAY = "renormalizationDelay";
+ public static final String NORMALIZATION_DELAY = "normalizationDelay";
public static final String POLICY = "policy";
private WeightedHostSet sorted;
- private double sum;
private int policy;
private int delay;
@@ -61,7 +60,7 @@
setFactor(FACTOR_FAILURE, 0.9);
setFactor(SCORE_HIGH_CAP, 100);
setFactor(SCORE_LOW_CAP, 0.001);
- setFactor(RENORMALIZATION_DELAY, 100);
+ setFactor(NORMALIZATION_DELAY, 100);
}
protected final void setFactor(String name, double value) {
@@ -83,21 +82,18 @@
protected void addToSorted(WeightedHost wh) {
sorted.add(wh);
- sum += wh.getScore();
}
protected synchronized void multiplyScore(WeightedHost wh, double factor) {
- double score = sorted.remove(wh);
+ double score = wh.getScore();
if (logger.isDebugEnabled()) {
logger.debug("multiplyScore(" + wh + ", " + factor + ")");
}
- sum -= score;
- WeightedHost nwh = new WeightedHost(wh.getHost(), checkCaps(score * factor));
+ double ns = checkCaps(score * factor);
+ sorted.changeScore(wh, ns);
if (logger.isDebugEnabled()) {
- logger.debug("Old score: " + score + ", new score: " + nwh.getScore());
+ logger.debug("Old score: " + score + ", new score: " + ns);
}
- sorted.add(nwh);
- sum += nwh.getScore();
}
protected double checkCaps(double score) {
@@ -112,17 +108,24 @@
}
}
- protected synchronized BoundContact getNextContact(TaskConstraints t) throws NoFreeResourceException {
+ protected synchronized BoundContact getNextContact(TaskConstraints t)
+ throws NoFreeResourceException {
checkGlobalLoadConditions();
BoundContact contact;
+
+ WeightedHostSet s = sorted;
+
+ s = constrain(s, getConstraintChecker(), t);
+
+ double sum = s.getSum();
if (policy == POLICY_WEIGHTED_RANDOM) {
double rand = Math.random() * sum;
if (logger.isDebugEnabled()) {
- logger.debug("Sorted: " + sorted);
+ logger.debug("Sorted: " + s);
logger.debug("Rand: " + rand + ", sum: " + sum);
}
- Iterator i = sorted.iterator();
- double sum = 0;
+ Iterator i = s.iterator();
+
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
sum += wh.getScore();
@@ -130,12 +133,12 @@
return wh.getHost();
}
}
- renormalize();
- contact = sorted.last().getHost();
+ normalize();
+ contact = s.last().getHost();
}
else if (policy == POLICY_BEST_SCORE) {
- renormalize();
- contact = sorted.last().getHost();
+ normalize();
+ contact = s.last().getHost();
}
else {
throw new KarajanRuntimeException("Invalid policy number: " + policy);
@@ -146,30 +149,32 @@
return contact;
}
- protected void renormalize() {
+ protected WeightedHostSet constrain(WeightedHostSet s, ResourceConstraintChecker rcc, TaskConstraints tc) {
+ if (rcc == null) {
+ return s;
+ }
+ else {
+ WeightedHostSet ns = new WeightedHostSet();
+ Iterator i = s.iterator();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ if (rcc.checkConstraints(wh.getHost(), tc)) {
+ ns.add(wh);
+ }
+ }
+ return ns;
+ }
+ }
+
+ protected void normalize() {
delay++;
- if (delay > getFactor(RENORMALIZATION_DELAY)) {
+ if (delay > getFactor(NORMALIZATION_DELAY)) {
if (logger.isDebugEnabled()) {
- logger.debug("Renormalizing...");
+ logger.debug("Normalizing...");
logger.debug("Before normalization: " + sorted);
}
delay = 0;
- double prod = 1;
- Iterator i = sorted.iterator();
- while (i.hasNext()) {
- WeightedHost wh = (WeightedHost) i.next();
- prod *= wh.getScore();
- }
- double geomAvg = Math.pow(prod, 1.0 / sorted.size());
- double renormalizationFactor = 1 / geomAvg;
- i = sorted.iterator();
- sorted = new WeightedHostSet();
- while (i.hasNext()) {
- WeightedHost wh = (WeightedHost) i.next();
- WeightedHost nwh = new WeightedHost(wh.getHost(), checkCaps(wh.getScore()
- * renormalizationFactor));
- sorted.add(nwh);
- }
+ sorted.normalize(1);
if (logger.isDebugEnabled()) {
logger.debug("After normalization: " + sorted);
}
@@ -180,7 +185,7 @@
private static final String[] myPropertyNames = new String[] { POLICY,
FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
- SCORE_HIGH_CAP, SCORE_LOW_CAP, RENORMALIZATION_DELAY };
+ SCORE_HIGH_CAP, SCORE_LOW_CAP, NORMALIZATION_DELAY };
private static Set propertyNamesSet;
static {
@@ -236,10 +241,10 @@
Exception ex = e.getStatus().getException();
if (ex != null) {
String exs = ex.toString();
- if (exs.indexOf("Connection refused") > 0 || exs.indexOf("connection refused") > 0) {
+ if (exs.indexOf("Connection refused") >= 0 || exs.indexOf("connection refused") >= 0) {
factorMultiple(contacts, getFactor(FACTOR_CONNECTION_REFUSED));
}
- else if (exs.indexOf("timeout") > 0) {
+ else if (exs.indexOf("timeout") >= 0) {
factorMultiple(contacts, getFactor(FACTOR_CONNECTION_TIMEOUT));
}
}
@@ -250,15 +255,27 @@
private void factorSubmission(Task t, Contact[] contacts, int exp) {
// I wonder where the line between abstraction and obfuscation is...
if (t.getType() == Task.JOB_SUBMISSION) {
- factorMultiple(contacts, Math.pow(getFactor(FACTOR_SUBMISSION_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(getFactor(FACTOR_SUBMISSION_TASK_LOAD), exp));
}
else if (t.getType() == Task.FILE_TRANSFER) {
- factorMultiple(contacts, Math.pow(getFactor(FACTOR_TRANSFER_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(getFactor(FACTOR_TRANSFER_TASK_LOAD), exp));
}
else if (t.getType() == Task.FILE_OPERATION) {
- factorMultiple(contacts, Math.pow(getFactor(FACTOR_FILEOP_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(getFactor(FACTOR_FILEOP_TASK_LOAD), exp));
}
}
+
+ private double spow(double x, int exp) {
+ if (exp == 1) {
+ return x;
+ }
+ else if (exp == -1) {
+ return 1/x;
+ }
+ else {
+ throw new IllegalArgumentException();
+ }
+ }
private void factorMultiple(Contact[] contacts, double factor) {
for (int i = 0; i < contacts.length; i++) {
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -9,108 +9,75 @@
*/
package org.globus.cog.karajan.scheduler;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.Map.Entry;
+import java.util.Iterator;
+import java.util.TreeSet;
public class WeightedHostSet {
- private TreeMap map;
- private HashMap scores;
+ private TreeSet scores;
+ private double sum;
public WeightedHostSet() {
- map = new TreeMap();
- scores = new HashMap();
+ init();
}
+ protected void init() {
+ scores = new TreeSet();
+ sum = 0;
+ }
+
public synchronized void add(WeightedHost wh) {
- Double score = wh.getScoreAsDouble();
- Set s = (Set) map.get(score);
- if (s == null) {
- s = new HashSet();
- map.put(score, s);
- }
-
- s.add(wh);
- scores.put(wh, score);
+ scores.add(wh);
+ sum += wh.getScore();
}
+ public synchronized void changeScore(WeightedHost wh, double newScore) {
+ scores.remove(wh);
+ sum -= wh.getScore();
+ scores.add(new WeightedHost(wh.getHost(), newScore));
+ sum += newScore;
+ }
+
public synchronized double remove(WeightedHost wh) {
- Double score = (Double) scores.get(wh);
- Set s = (Set) map.get(score);
- if (s != null) {
- s.remove(wh);
- if (s.isEmpty()) {
- map.remove(score);
- }
- scores.remove(wh);
- }
- return score.doubleValue();
+ scores.remove(wh);
+ return wh.getScore();
}
-
- public java.util.Iterator iterator() {
- return new Iterator();
+
+ public Iterator iterator() {
+ return scores.iterator();
}
-
+
public WeightedHost last() {
- Set s = (Set) map.get(map.lastKey());
- return (WeightedHost) s.iterator().next();
+ return (WeightedHost) scores.last();
}
-
+
public int size() {
return scores.size();
}
-
- public String toString() {
- return map.toString();
+
+ public double getSum() {
+ return sum;
}
-
- private class Iterator implements java.util.Iterator{
- private final java.util.Iterator mapi;
- private java.util.Iterator seti;
- private WeightedHost lasth;
- private Set lasts;
-
-
- public Iterator() {
- mapi = map.entrySet().iterator();
- if (mapi.hasNext()) {
- Entry next = (Entry) mapi.next();
- lasts = (Set) next.getValue();
- seti = lasts.iterator();
- }
- }
- public boolean hasNext() {
- return seti.hasNext() || mapi.hasNext();
+ protected synchronized void normalize(double target) {
+ double prod = 1;
+ Iterator i = scores.iterator();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ prod *= wh.getScore();
}
-
- public Object next() {
- if (seti.hasNext()) {
- lasth = (WeightedHost) seti.next();
- return lasth;
- }
- else if (mapi.hasNext()) {
- Entry next = (Entry) mapi.next();
- lasts = (Set) next.getValue();
- seti = lasts.iterator();
- lasth = (WeightedHost) seti.next();
- return lasth;
- }
- else {
- throw new NoSuchElementException();
- }
+ double geomAvg = Math.pow(prod, target / scores.size());
+ double renormalizationFactor = 1 / geomAvg;
+ i = scores.iterator();
+ scores = new TreeSet();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ WeightedHost nwh = new WeightedHost(wh.getHost(), wh.getScore()
+ * renormalizationFactor);
+ add(nwh);
}
-
- public void remove() {
- seti.remove();
- if (lasts.isEmpty()) {
- mapi.remove();
- }
- scores.remove(lasth);
- }
}
+ public String toString() {
+ return scores.toString();
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2006-11-30 22:31:30
|
Revision: 1440
http://svn.sourceforge.net/cogkit/?rev=1440&view=rev
Author: hategan
Date: 2006-11-30 14:31:21 -0800 (Thu, 30 Nov 2006)
Log Message:
-----------
tweaks (fixes actually) to the wh scheduler
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2006-11-30 22:29:56 UTC (rev 1439)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2006-11-30 22:31:21 UTC (rev 1440)
@@ -9,33 +9,76 @@
*/
package org.globus.cog.karajan.scheduler;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+
import org.globus.cog.karajan.util.BoundContact;
public class WeightedHost implements Comparable {
private BoundContact host;
private Double score;
+ private double tscore;
+ private int load;
+ private double delayedDelta;
public WeightedHost(BoundContact contact) {
- this(contact, 1.0);
+ this(contact, 0.0);
}
public WeightedHost(BoundContact contact, double score) {
+ this(contact, score, 0);
+ }
+
+ public WeightedHost(BoundContact contact, double score, int load) {
this.host = contact;
+ setScore(score);
+ this.load = load;
+ }
+
+ protected void setScore(double score) {
this.score = new Double(score);
+ this.tscore = smooth(score);
}
+ public static final double T = 100;
+ public static final double B = 2.0*Math.log(T)/Math.PI;
+ public static final double C = 0.2;
+
+ public double smooth(double score) {
+ return Math.exp(B*Math.atan(C*score));
+ }
+
public final double getScore() {
return score.doubleValue();
}
-
+
public final Double getScoreAsDouble() {
return score;
}
-
+
+ public final double getTScore() {
+ return tscore;
+ }
+
public final BoundContact getHost() {
return host;
}
+ public int getLoad() {
+ return load;
+ }
+
+ public void setLoad(int load) {
+ this.load = load;
+ }
+
+ public synchronized void changeLoad(int dl) {
+ load += dl;
+ if (load < 0) {
+ load = 0;
+ }
+ }
+
public boolean equals(Object obj) {
if (obj instanceof WeightedHost) {
WeightedHost wh = (WeightedHost) obj;
@@ -48,19 +91,36 @@
return host.hashCode();
}
+ public static final NumberFormat D4;
+ static {
+ D4 = DecimalFormat.getInstance();
+ D4.setMaximumFractionDigits(3);
+ D4.setMinimumFractionDigits(3);
+ }
+
public String toString() {
- return host.toString();
+ return host.toString() + ":" + D4.format(score) + "(" + D4.format(tscore) + "):"+load;
}
public int compareTo(Object o) {
WeightedHost other = (WeightedHost) o;
int r = score.compareTo(other.score);
if (r == 0) {
- //arbitrary ordering on the contact
+ // arbitrary ordering on the contact
return System.identityHashCode(host) - System.identityHashCode(other.host);
}
else {
return r;
}
}
+
+ public double getDelayedDelta() {
+ return delayedDelta;
+ }
+
+ public void setDelayedDelta(double delayedDelta) {
+ this.delayedDelta = delayedDelta;
+ }
+
+
}
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-30 22:29:56 UTC (rev 1439)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-30 22:31:21 UTC (rev 1440)
@@ -27,7 +27,6 @@
import org.globus.cog.karajan.workflow.KarajanRuntimeException;
public class WeightedHostScoreScheduler extends LateBindingScheduler {
-
private static final Logger logger = Logger.getLogger(WeightedHostScoreScheduler.class);
public static final int POLICY_WEIGHTED_RANDOM = 0;
@@ -40,43 +39,40 @@
public static final String FACTOR_SUCCESS = "successFactor";
public static final String FACTOR_FAILURE = "failureFactor";
public static final String SCORE_HIGH_CAP = "scoreHighCap";
- public static final String SCORE_LOW_CAP = "scoreLowCap";
- public static final String NORMALIZATION_DELAY = "normalizationDelay";
public static final String POLICY = "policy";
private WeightedHostSet sorted;
private int policy;
- private int delay;
/*
* These field names must match the property names
*/
private double connectionRefusedFactor, connectionTimeoutFactor, jobSubmissionTaskLoadFactor,
transferTaskLoadFactor, fileOperationTaskLoadFactor, successFactor, failureFactor,
- scoreHighCap, scoreLowCap;
- private int normalizationDelay;
+ scoreHighCap;
+ private int jobThrottle;
+
public WeightedHostScoreScheduler() {
policy = POLICY_WEIGHTED_RANDOM;
setDefaultFactors();
}
protected final void setDefaultFactors() {
- connectionRefusedFactor = 0.1;
- connectionTimeoutFactor = 0.05;
- jobSubmissionTaskLoadFactor = 0.9;
- transferTaskLoadFactor = 0.9;
- fileOperationTaskLoadFactor = 0.95;
- successFactor = 1.01;
- failureFactor = 0.9;
+ connectionRefusedFactor = -10;
+ connectionTimeoutFactor = -20;
+ jobSubmissionTaskLoadFactor = -0.2;
+ transferTaskLoadFactor = -0.2;
+ fileOperationTaskLoadFactor = -0.01;
+ successFactor = 1;
+ failureFactor = -0.1;
scoreHighCap = 100;
- scoreLowCap = 0.01;
- normalizationDelay = 100;
+ jobThrottle = 2;
}
public void setResources(ContactSet grid) {
super.setResources(grid);
- sorted = new WeightedHostSet();
+ sorted = new WeightedHostSet(scoreHighCap);
Iterator i = grid.getContacts().iterator();
while (i.hasNext()) {
addToSorted(new WeightedHost((BoundContact) i.next()));
@@ -92,78 +88,100 @@
if (logger.isDebugEnabled()) {
logger.debug("multiplyScore(" + wh + ", " + factor + ")");
}
- double ns = checkCaps(score * factor);
+ double ns = factor(score, factor);
sorted.changeScore(wh, ns);
if (logger.isDebugEnabled()) {
logger.debug("Old score: " + score + ", new score: " + ns);
}
}
- protected double checkCaps(double score) {
- if (score > scoreHighCap) {
- return scoreHighCap;
- }
- else if (score < scoreLowCap) {
- return scoreLowCap;
- }
- else {
- return score;
- }
+ protected synchronized void multiplyScoreLater(WeightedHost wh, double factor) {
+ wh.setDelayedDelta(wh.getDelayedDelta() + factor);
}
+ protected double factor(double score, double factor) {
+ return score + factor;
+ }
+
protected synchronized BoundContact getNextContact(TaskConstraints t)
throws NoFreeResourceException {
checkGlobalLoadConditions();
BoundContact contact;
WeightedHostSet s = sorted;
+ WeightedHost selected = null;
s = constrain(s, getConstraintChecker(), t);
double sum = s.getSum();
if (policy == POLICY_WEIGHTED_RANDOM) {
double rand = Math.random() * sum;
+ if (logger.isInfoEnabled() && !s.isEmpty()) {
+ logger.info("Sorted: " + s);
+ }
if (logger.isDebugEnabled()) {
- logger.debug("Sorted: " + s);
logger.debug("Rand: " + rand + ", sum: " + sum);
}
Iterator i = s.iterator();
-
+
sum = 0;
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
- sum += wh.getScore();
+ sum += wh.getTScore();
if (sum >= rand) {
- return wh.getHost();
+ selected = wh;
+ break;
}
}
- normalize();
- contact = s.last().getHost();
+ if (selected == null) {
+ if (s.isEmpty()) {
+ throw new NoFreeResourceException();
+ }
+ else {
+ selected = s.last();
+ }
+ }
}
else if (policy == POLICY_BEST_SCORE) {
- normalize();
- contact = s.last().getHost();
+ selected = s.last();
}
else {
throw new KarajanRuntimeException("Invalid policy number: " + policy);
}
if (logger.isDebugEnabled()) {
- logger.debug("Next contact: " + contact);
+ logger.debug("Next contact: " + selected.getHost());
}
- return contact;
+ selected.changeLoad(1);
+ selected.setDelayedDelta(successFactor);
+ return selected.getHost();
}
+ public void releaseContact(BoundContact contact) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Releasing contact " + contact);
+ }
+ super.releaseContact(contact);
+ WeightedHost wh = sorted.findHost(contact);
+ if (wh != null) {
+ wh.changeLoad(-1);
+ sorted.changeScore(wh, wh.getScore() + wh.getDelayedDelta());
+ }
+ else {
+ logger.warn("ghost contact (" + contact + ") in releaseContact");
+ }
+ }
+
protected WeightedHostSet constrain(WeightedHostSet s, ResourceConstraintChecker rcc,
TaskConstraints tc) {
if (rcc == null) {
return s;
}
else {
- WeightedHostSet ns = new WeightedHostSet();
+ WeightedHostSet ns = new WeightedHostSet(scoreHighCap);
Iterator i = s.iterator();
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
- if (rcc.checkConstraints(wh.getHost(), tc)) {
+ if (rcc.checkConstraints(wh.getHost(), tc) && notOverloaded(wh)) {
ns.add(wh);
}
}
@@ -171,26 +189,17 @@
}
}
- protected void normalize() {
- delay++;
- if (delay > normalizationDelay) {
- if (logger.isDebugEnabled()) {
- logger.debug("Normalizing...");
- logger.debug("Before normalization: " + sorted);
- }
- delay = 0;
- sorted.normalize(1);
- if (logger.isDebugEnabled()) {
- logger.debug("After normalization: " + sorted);
- }
- }
+ protected boolean notOverloaded(WeightedHost wh) {
+ double score = wh.getTScore();
+ int load = wh.getLoad();
+ return load < jobThrottle * score + 2;
}
private static String[] propertyNames;
private static final String[] myPropertyNames = new String[] { POLICY,
FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
- SCORE_HIGH_CAP, SCORE_LOW_CAP, NORMALIZATION_DELAY };
+ SCORE_HIGH_CAP };
private static Set propertyNamesSet;
static {
@@ -262,19 +271,19 @@
}
else if (code == Status.COMPLETED) {
factorSubmission(t, contacts, -1);
- factorMultiple(contacts, successFactor);
+ factorMultipleLater(contacts, successFactor);
}
else if (code == Status.FAILED) {
- factorMultiple(contacts, failureFactor);
+ factorMultipleLater(contacts, failureFactor);
Exception ex = e.getStatus().getException();
if (ex != null) {
String exs = ex.toString();
if (exs.indexOf("Connection refused") >= 0
|| exs.indexOf("connection refused") >= 0) {
- factorMultiple(contacts, connectionRefusedFactor);
+ factorMultipleLater(contacts, connectionRefusedFactor);
}
else if (exs.indexOf("timeout") >= 0) {
- factorMultiple(contacts, connectionTimeoutFactor);
+ factorMultipleLater(contacts, connectionTimeoutFactor);
}
}
}
@@ -305,7 +314,7 @@
return x;
}
else if (exp == -1) {
- return 1 / x;
+ return -x;
}
else {
throw new IllegalArgumentException();
@@ -321,4 +330,14 @@
}
}
}
+
+ private void factorMultipleLater(Contact[] contacts, double factor) {
+ for (int i = 0; i < contacts.length; i++) {
+ BoundContact bc = (BoundContact) contacts[i];
+ WeightedHost wh = sorted.findHost(bc);
+ if (wh != null) {
+ multiplyScoreLater(wh, factor);
+ }
+ }
+ }
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2006-11-30 22:29:56 UTC (rev 1439)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2006-11-30 22:31:21 UTC (rev 1440)
@@ -20,11 +20,13 @@
private TreeSet scores;
private Map weightedHosts;
private double sum;
+ private double scoreHighCap;
- public WeightedHostSet() {
+ public WeightedHostSet(double scoreHighCap) {
init();
+ this.scoreHighCap = scoreHighCap;
}
-
+
protected void init() {
scores = new TreeSet();
weightedHosts = new HashMap();
@@ -34,16 +36,16 @@
public synchronized void add(WeightedHost wh) {
scores.add(wh);
weightedHosts.put(wh.getHost(), wh);
- sum += wh.getScore();
+ sum += wh.getTScore();
}
-
+
public synchronized void changeScore(WeightedHost wh, double newScore) {
scores.remove(wh);
- sum -= wh.getScore();
- WeightedHost nwh = new WeightedHost(wh.getHost(), newScore);
+ sum -= wh.getTScore();
+ WeightedHost nwh = new WeightedHost(wh.getHost(), newScore, wh.getLoad());
weightedHosts.put(wh.getHost(), nwh);
scores.add(nwh);
- sum += newScore;
+ sum += nwh.getTScore();
}
public synchronized double remove(WeightedHost wh) {
@@ -57,7 +59,7 @@
}
public Iterator iterator() {
- return scores.iterator();
+ return scores.iterator();
}
public WeightedHost last() {
@@ -85,12 +87,15 @@
scores = new TreeSet();
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
- WeightedHost nwh = new WeightedHost(wh.getHost(), wh.getScore()
- * renormalizationFactor);
+ WeightedHost nwh = new WeightedHost(wh.getHost(), wh.getScore() * renormalizationFactor);
add(nwh);
}
}
+ public boolean isEmpty() {
+ return scores.isEmpty();
+ }
+
public String toString() {
return scores.toString();
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2006-12-07 17:26:03
|
Revision: 1449
http://svn.sourceforge.net/cogkit/?rev=1449&view=rev
Author: hategan
Date: 2006-12-07 09:26:00 -0800 (Thu, 07 Dec 2006)
Log Message:
-----------
fail when no resources can be found for a task and no tasks are running
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java 2006-12-07 17:24:30 UTC (rev 1448)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java 2006-12-07 17:26:00 UTC (rev 1449)
@@ -16,6 +16,10 @@
public class ContactAllocationTask extends TaskImpl {
private BoundContact contact;
private VariableStack stack;
+
+ public ContactAllocationTask() {
+ setName("Contact allocation task");
+ }
public BoundContact getContact() {
return contact;
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2006-12-07 17:24:30 UTC (rev 1448)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2006-12-07 17:26:00 UTC (rev 1449)
@@ -152,7 +152,14 @@
}
protected TaskConstraints getTaskConstraints(Task t) {
- return null;
+ Object constraints = super.getConstraints(t);
+ if (constraints instanceof Contact[]) {
+ Contact[] c = (Contact[]) constraints;
+ if (c.length > 0 && c[0] != null) {
+ return c[0].getConstraints();
+ }
+ }
+ return null;
}
public void enqueue(Task task, Object constraints) {
@@ -236,6 +243,10 @@
success = true;
}
catch (NoFreeResourceException e) {
+ if (running == 0) {
+ failUnsubmittedTask(t, "Could not find any valid host for task \"" + t
+ + "\" with constraints " + getTaskConstraints(t), e);
+ }
}
catch (Exception e) {
failTask(t, "The scheduler could not execute the task", e);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2006-12-28 20:37:03
|
Revision: 1495
http://svn.sourceforge.net/cogkit/?rev=1495&view=rev
Author: hategan
Date: 2006-12-28 12:37:00 -0800 (Thu, 28 Dec 2006)
Log Message:
-----------
clean constraints; distinguish between no resources because of unsatisfiable contraints or load conditions
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/NoSuchResourceException.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2006-12-28 20:35:59 UTC (rev 1494)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2006-12-28 20:37:00 UTC (rev 1495)
@@ -211,6 +211,12 @@
return constraints.get(task);
}
}
+
+ protected void removeConstraints(Task task) {
+ synchronized(constraints) {
+ constraints.remove(task);
+ }
+ }
public void addTaskTransformer(TaskTransformer taskTransformer) {
this.taskTransformers.add(taskTransformer);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java 2006-12-28 20:35:59 UTC (rev 1494)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java 2006-12-28 20:37:00 UTC (rev 1495)
@@ -65,21 +65,29 @@
checkGlobalLoadConditions();
int initial = contactCursor;
ContactSet resources = getResources();
- while (!checkLoad(resources.get(contactCursor))
- || !checkConstraints(resources.get(contactCursor), t)) {
+ boolean incompatibleConstraints = true;
+ while (true) {
+ if (checkConstraints(resources.get(contactCursor), t)) {
+ incompatibleConstraints = false;
+ if (checkLoad(resources.get(contactCursor))) {
+ break;
+ }
+ }
incContactCursor();
if (contactCursor == initial) {
- logger.debug("No free resources");
- throw new NoFreeResourceException("No free hosts available");
+ if (incompatibleConstraints) {
+ throw new NoSuchResourceException();
+ }
+ else {
+ throw new NoFreeResourceException("No free hosts available");
+ }
}
}
- if (initial == contactCursor) {
- incContactCursor();
- }
BoundContact contact = getResources().get(contactCursor);
if (logger.isDebugEnabled()) {
logger.debug("Contact: " + contact);
}
+ incContactCursor();
return contact;
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2006-12-28 20:35:59 UTC (rev 1494)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2006-12-28 20:37:00 UTC (rev 1495)
@@ -235,22 +235,26 @@
break;
}
Task t = (Task) getJobQueue().get(index);
+ boolean remove = true;
try {
submitUnbound(t);
- synchronized (this) {
- getJobQueue().remove(index);
- }
success = true;
}
- catch (NoFreeResourceException e) {
- if (running == 0) {
- failUnsubmittedTask(t, "Could not find any valid host for task \"" + t
+ catch (NoSuchResourceException e) {
+ failTask(t, "Could not find any valid host for task \"" + t
+ "\" with constraints " + getTaskConstraints(t), e);
- }
}
+ catch (NoFreeResourceException e) {
+ remove = false;
+ }
catch (Exception e) {
failTask(t, "The scheduler could not execute the task", e);
}
+ if (remove) {
+ synchronized (this) {
+ getJobQueue().remove(index);
+ }
+ }
index++;
}
}
@@ -277,14 +281,6 @@
s.setMessage(message);
s.setException(e);
t.setStatus(s);
- }
-
- protected void failUnsubmittedTask(Task t, String message, Exception e) {
- Status s = new StatusImpl();
- s.setStatusCode(Status.FAILED);
- s.setMessage(message);
- s.setException(e);
- t.setStatus(s);
fireJobStatusChangeEvent(t, s);
}
@@ -327,7 +323,7 @@
services[i] = resolveService((BoundContact) contacts[i], t.getType());
}
if (services[i] == null) {
- failUnsubmittedTask(t, "Could not find a suitable service/provider for host "
+ failTask(t, "Could not find a suitable service/provider for host "
+ contacts[i], null);
return;
}
@@ -348,11 +344,10 @@
throw e;
}
catch (Exception e) {
- if (e instanceof NullPointerException) {
- e.printStackTrace();
- }
- logger.debug("Scheduler exception: job =" + t.getIdentity().getValue() + ", status = "
+ if (logger.isDebugEnabled()) {
+ logger.debug("Scheduler exception: job =" + t.getIdentity().getValue() + ", status = "
+ t.getStatus(), e);
+ }
Status status = t.getStatus();
status.setPrevStatusCode(status.getStatusCode());
status.setStatusCode(Status.FAILED);
@@ -556,6 +551,7 @@
synchronized (taskContacts) {
taskContacts.remove(task);
}
+ removeConstraints(task);
for (int i = 0; i < contacts.length; i++) {
BoundContact c = (BoundContact) contacts[i];
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/NoSuchResourceException.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/NoSuchResourceException.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/NoSuchResourceException.java 2006-12-28 20:37:00 UTC (rev 1495)
@@ -0,0 +1,30 @@
+// ----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Sep 2, 2004
+ */
+package org.globus.cog.karajan.scheduler;
+
+public class NoSuchResourceException extends NoFreeResourceException {
+ private static final long serialVersionUID = 4757539377378613461L;
+
+ public NoSuchResourceException() {
+ super();
+ }
+
+ public NoSuchResourceException(String message) {
+ super(message);
+ }
+
+ public NoSuchResourceException(Throwable cause) {
+ super(cause);
+ }
+
+ public NoSuchResourceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-12-28 20:35:59 UTC (rev 1494)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-12-28 20:37:00 UTC (rev 1495)
@@ -112,6 +112,16 @@
WeightedHost selected = null;
s = constrain(s, getConstraintChecker(), t);
+
+ if (s.isEmpty()) {
+ throw new NoSuchResourceException();
+ }
+
+ removeOverloaded(s);
+
+ if (s.isEmpty()) {
+ throw new NoFreeResourceException();
+ }
double sum = s.getSum();
if (policy == POLICY_WEIGHTED_RANDOM) {
@@ -134,12 +144,7 @@
}
}
if (selected == null) {
- if (s.isEmpty()) {
- throw new NoFreeResourceException();
- }
- else {
- selected = s.last();
- }
+ selected = s.last();
}
}
else if (policy == POLICY_BEST_SCORE) {
@@ -181,18 +186,28 @@
Iterator i = s.iterator();
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
- if (rcc.checkConstraints(wh.getHost(), tc) && notOverloaded(wh)) {
+ if (rcc.checkConstraints(wh.getHost(), tc)) {
ns.add(wh);
}
}
return ns;
}
}
+
+ protected void removeOverloaded(WeightedHostSet s) {
+ Iterator i = s.iterator();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ if (overloaded(wh)) {
+ i.remove();
+ }
+ }
+ }
- protected boolean notOverloaded(WeightedHost wh) {
+ protected boolean overloaded(WeightedHost wh) {
double score = wh.getTScore();
int load = wh.getLoad();
- return load < jobThrottle * score + 2;
+ return !(load < jobThrottle * score + 2);
}
private static String[] propertyNames;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-01-04 21:40:17
|
Revision: 1521
http://svn.sourceforge.net/cogkit/?rev=1521&view=rev
Author: hategan
Date: 2007-01-04 13:40:12 -0800 (Thu, 04 Jan 2007)
Log Message:
-----------
fixed a few issues
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2007-01-04 04:55:07 UTC (rev 1520)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2007-01-04 21:40:12 UTC (rev 1521)
@@ -6,6 +6,7 @@
package org.globus.cog.karajan.scheduler;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -49,7 +50,7 @@
private final Map constraints;
private List taskTransformers, failureHandlers;
-
+
private ResourceConstraintChecker constraintChecker;
public AbstractScheduler() {
@@ -125,29 +126,38 @@
public void addJobStatusListener(StatusListener l, Task task) {
List jobListeners;
- if (listeners.containsKey(task)) {
- jobListeners = (List) listeners.get(task);
+ synchronized (listeners) {
+ if (listeners.containsKey(task)) {
+ jobListeners = (List) listeners.get(task);
+ }
+ else {
+ jobListeners = new ArrayList();
+ listeners.put(task, jobListeners);
+ }
+ jobListeners.add(l);
}
- else {
- jobListeners = new LinkedList();
- listeners.put(task, jobListeners);
- }
- jobListeners.add(l);
}
public void removeJobStatusListener(StatusListener l, Task task) {
- if (listeners.containsKey(task)) {
- List jobListeners = (List) listeners.get(task);
- jobListeners.remove(l);
- if (jobListeners.size() == 0) {
- listeners.remove(task);
+ synchronized (listeners) {
+ if (listeners.containsKey(task)) {
+ List jobListeners = (List) listeners.get(task);
+ jobListeners.remove(l);
+ if (jobListeners.size() == 0) {
+ listeners.remove(task);
+ }
}
}
}
public void fireJobStatusChangeEvent(StatusEvent e) {
- if (listeners.containsKey(e.getSource())) {
- List jobListeners = new LinkedList((List) listeners.get(e.getSource()));
+ List jobListeners = null;
+ synchronized(listeners) {
+ if (listeners.containsKey(e.getSource())) {
+ jobListeners = new ArrayList((List) listeners.get(e.getSource()));
+ }
+ }
+ if (jobListeners != null) {
Iterator i = jobListeners.iterator();
while (i.hasNext()) {
((StatusListener) i.next()).statusChanged(e);
@@ -201,19 +211,19 @@
}
protected void setConstraints(Task task, Object constraint) {
- synchronized(constraints) {
+ synchronized (constraints) {
constraints.put(task, constraint);
}
}
protected Object getConstraints(Task task) {
- synchronized(constraints) {
+ synchronized (constraints) {
return constraints.get(task);
}
}
-
+
protected void removeConstraints(Task task) {
- synchronized(constraints) {
+ synchronized (constraints) {
constraints.remove(task);
}
}
@@ -233,7 +243,7 @@
((TaskTransformer) i.next()).transformTask(t, contacts, services);
}
}
-
+
protected boolean runFailureHandlers(Task t) {
Iterator i = failureHandlers.iterator();
while (i.hasNext()) {
@@ -244,7 +254,7 @@
}
return false;
}
-
+
public void addFailureHandler(FailureHandler handler) {
failureHandlers.add(handler);
}
@@ -256,7 +266,7 @@
public void setConstraintChecker(ResourceConstraintChecker constraintChecker) {
this.constraintChecker = constraintChecker;
}
-
+
protected boolean checkConstraints(BoundContact resource, TaskConstraints tc) {
if (constraintChecker == null) {
return true;
@@ -265,7 +275,7 @@
return constraintChecker.checkConstraints(resource, tc);
}
}
-
+
protected List checkConstraints(List resources, TaskConstraints tc) {
if (constraintChecker == null) {
return resources;
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-01-04 04:55:07 UTC (rev 1520)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-01-04 21:40:12 UTC (rev 1521)
@@ -40,6 +40,7 @@
public static final String FACTOR_FAILURE = "failureFactor";
public static final String SCORE_HIGH_CAP = "scoreHighCap";
public static final String POLICY = "policy";
+ public static final String JOB_THROTTLE = "jobThrottle";
private WeightedHostSet sorted;
private int policy;
@@ -112,13 +113,13 @@
WeightedHost selected = null;
s = constrain(s, getConstraintChecker(), t);
-
+
if (s.isEmpty()) {
throw new NoSuchResourceException();
}
-
- removeOverloaded(s);
-
+
+ s = removeOverloaded(s);
+
if (s.isEmpty()) {
throw new NoFreeResourceException();
}
@@ -161,7 +162,7 @@
return selected.getHost();
}
- public void releaseContact(BoundContact contact) {
+ public synchronized void releaseContact(BoundContact contact) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing contact " + contact);
}
@@ -193,15 +194,29 @@
return ns;
}
}
-
- protected void removeOverloaded(WeightedHostSet s) {
- Iterator i = s.iterator();
- while (i.hasNext()) {
- WeightedHost wh = (WeightedHost) i.next();
- if (overloaded(wh)) {
- i.remove();
+
+ protected WeightedHostSet removeOverloaded(WeightedHostSet s) {
+ if (s == sorted) {
+ WeightedHostSet ns = new WeightedHostSet(scoreHighCap);
+ Iterator i = s.iterator();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ if (!overloaded(wh)) {
+ ns.add(wh);
+ }
}
+ return ns;
}
+ else {
+ Iterator i = s.iterator();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ if (overloaded(wh)) {
+ i.remove();
+ }
+ }
+ return s;
+ }
}
protected boolean overloaded(WeightedHost wh) {
@@ -214,7 +229,7 @@
private static final String[] myPropertyNames = new String[] { POLICY,
FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
- SCORE_HIGH_CAP };
+ SCORE_HIGH_CAP, JOB_THROTTLE };
private static Set propertyNamesSet;
static {
@@ -248,6 +263,9 @@
throw new KarajanRuntimeException("Unknown policy type: " + value);
}
}
+ else if (JOB_THROTTLE.equals(name)) {
+ jobThrottle = TypeUtil.toInt(value);
+ }
else {
double val = TypeUtil.toDouble(value);
try {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-01-05 03:53:12
|
Revision: 1525
http://svn.sourceforge.net/cogkit/?rev=1525&view=rev
Author: hategan
Date: 2007-01-04 19:53:10 -0800 (Thu, 04 Jan 2007)
Log Message:
-----------
some adjustments
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-01-05 03:52:39 UTC (rev 1524)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-01-05 03:53:10 UTC (rev 1525)
@@ -277,6 +277,7 @@
protected void failTask(Task t, String message, Exception e) {
Status s = new StatusImpl();
+ s.setPrevStatusCode(t.getStatus().getStatusCode());
s.setStatusCode(Status.FAILED);
s.setMessage(message);
s.setException(e);
@@ -348,13 +349,7 @@
logger.debug("Scheduler exception: job =" + t.getIdentity().getValue() + ", status = "
+ t.getStatus(), e);
}
- Status status = t.getStatus();
- status.setPrevStatusCode(status.getStatusCode());
- status.setStatusCode(Status.FAILED);
- status.setException(e);
- status.setMessage(e.toString());
- StatusEvent se = new StatusEvent(t, status);
- fireJobStatusChangeEvent(se);
+ failTask(t, e.toString(), e);
return;
}
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-01-05 03:52:39 UTC (rev 1524)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-01-05 03:53:10 UTC (rev 1525)
@@ -298,6 +298,11 @@
Task t = (Task) e.getSource();
int code = e.getStatus().getStatusCode();
Contact[] contacts = getContacts(t);
+
+ if (contacts == null) {
+ return;
+ }
+
if (code == Status.SUBMITTED) {
// this isn't reliable
// factorSubmission(t, contacts, 1);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-02-28 01:49:50
|
Revision: 1596
http://svn.sourceforge.net/cogkit/?rev=1596&view=rev
Author: hategan
Date: 2007-02-27 17:49:48 -0800 (Tue, 27 Feb 2007)
Log Message:
-----------
fixed a bunch of leaks
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2007-02-28 01:49:48 UTC (rev 1596)
@@ -152,7 +152,7 @@
public void fireJobStatusChangeEvent(StatusEvent e) {
List jobListeners = null;
- synchronized(listeners) {
+ synchronized (listeners) {
if (listeners.containsKey(e.getSource())) {
jobListeners = new ArrayList((List) listeners.get(e.getSource()));
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java 2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java 2007-02-28 01:49:48 UTC (rev 1596)
@@ -12,9 +12,11 @@
import org.globus.cog.abstraction.impl.common.task.TaskImpl;
import org.globus.cog.karajan.stack.VariableStack;
import org.globus.cog.karajan.util.BoundContact;
+import org.globus.cog.karajan.util.Contact;
public class ContactAllocationTask extends TaskImpl {
private BoundContact contact;
+ private Contact virtualContact;
private VariableStack stack;
public ContactAllocationTask() {
@@ -40,4 +42,12 @@
public int getRequiredServices() {
return 1;
}
+
+ public void setVirtualContact(Contact vc) {
+ this.virtualContact = vc;
+ }
+
+ public Contact getVirtualContact() {
+ return this.virtualContact;
+ }
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-02-28 01:49:48 UTC (rev 1596)
@@ -106,12 +106,12 @@
return allocateContact(null);
}
- public synchronized void releaseContact(BoundContact contact) {
- virtualContacts.remove(contact.getHost());
+ public synchronized void releaseContact(Contact contact) {
+ virtualContacts.remove(contact);
tasksFinished = true;
}
- public synchronized BoundContact resolveVirtualContact(Task t, Contact contact)
+ public synchronized BoundContact resolveVirtualContact(Contact contact)
throws NoFreeResourceException {
if (contact.isVirtual()) {
BoundContact next;
@@ -382,7 +382,7 @@
}
else {
if (contact.isVirtual()) {
- boundContact = resolveVirtualContact(t, contact);
+ boundContact = resolveVirtualContact(contact);
}
else {
boundContact = (BoundContact) contact;
@@ -447,6 +447,7 @@
throws TaskSubmissionException {
if (t instanceof ContactAllocationTask) {
((ContactAllocationTask) t).setContact((BoundContact) contacts[0]);
+ removeConstraints(t);
Status status = t.getStatus();
status.setPrevStatusCode(status.getStatusCode());
status.setStatusCode(Status.COMPLETED);
@@ -454,6 +455,14 @@
fireJobStatusChangeEvent(se);
return;
}
+ for (int i = 0; i < contacts.length; i++) {
+ if (!(contacts[i] instanceof BoundContact)) {
+ throw new TaskSubmissionException(
+ "submitBoundToServices called but at least a contact is not bound");
+ }
+ BoundContact c = (BoundContact) contacts[i];
+ c.setActiveTasks(c.getActiveTasks() + 1);
+ }
applyTaskTransformers(t, contacts, services);
t.addStatusListener(this);
TaskHandler handler;
@@ -467,14 +476,6 @@
synchronized (taskContacts) {
taskContacts.put(t, contacts);
}
- for (int i = 0; i < contacts.length; i++) {
- if (!(contacts[i] instanceof BoundContact)) {
- throw new TaskSubmissionException(
- "submitBoundToServices called but at least a contact is not bound");
- }
- BoundContact c = (BoundContact) contacts[i];
- c.setActiveTasks(c.getActiveTasks() + 1);
- }
if (logger.isDebugEnabled()) {
logger.debug("Submitting task " + t);
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java 2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java 2007-02-28 01:49:48 UTC (rev 1596)
@@ -10,7 +10,6 @@
import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
-import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.Contact;
import org.globus.cog.karajan.util.ContactSet;
import org.globus.cog.karajan.util.TaskHandlerWrapper;
@@ -41,7 +40,7 @@
* Can be used to tell the scheduler that a previously allocated contact
* (using allocateContact()) is not used any more.
*/
- void releaseContact(BoundContact sc);
+ void releaseContact(Contact sc);
/**
* Sets the set of resources that the scheduler will use
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-02-28 01:49:48 UTC (rev 1596)
@@ -78,9 +78,9 @@
public void setResources(ContactSet grid) {
super.setResources(grid);
- if (grid.getContacts() == null) {
- return;
- }
+ if (grid.getContacts() == null) {
+ return;
+ }
sorted = new WeightedHostSet(scoreHighCap);
Iterator i = grid.getContacts().iterator();
while (i.hasNext()) {
@@ -184,26 +184,33 @@
throw new KarajanRuntimeException("Invalid policy number: " + policy);
}
if (logger.isDebugEnabled()) {
- logger.debug("Next contact: " + selected.getHost());
+ logger.debug("Next contact: " + selected);
}
sorted.changeLoad(selected, 1);
selected.setDelayedDelta(successFactor);
return selected.getHost();
}
- public synchronized void releaseContact(BoundContact contact) {
+ public synchronized void releaseContact(Contact contact) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing contact " + contact);
}
- super.releaseContact(contact);
- WeightedHost wh = sorted.findHost(contact);
- if (wh != null) {
- change = true;
- sorted.changeLoad(wh, -1);
- sorted.changeScore(wh, wh.getScore() + wh.getDelayedDelta());
+ try {
+ BoundContact bc = this.resolveVirtualContact(contact);
+ super.releaseContact(contact);
+
+ WeightedHost wh = sorted.findHost(bc);
+ if (wh != null) {
+ change = true;
+ sorted.changeLoad(wh, -1);
+ sorted.changeScore(wh, wh.getScore() + wh.getDelayedDelta());
+ }
+ else {
+ logger.warn("ghost contact (" + contact + ") in releaseContact");
+ }
}
- else {
- logger.warn("ghost contact (" + contact + ") in releaseContact");
+ catch (NoFreeResourceException e) {
+ logger.warn("Failed to release contact " + contact, e);
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-08-24 00:51:43
|
Revision: 1723
http://cogkit.svn.sourceforge.net/cogkit/?rev=1723&view=rev
Author: hategan
Date: 2007-08-23 17:51:42 -0700 (Thu, 23 Aug 2007)
Log Message:
-----------
some failures didn't properly clean up
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-08-24 00:50:52 UTC (rev 1722)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-08-24 00:51:42 UTC (rev 1723)
@@ -9,8 +9,10 @@
*/
package org.globus.cog.karajan.scheduler;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -297,6 +299,9 @@
}
protected void failTask(Task t, String message, Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failing task " + t + " because of " + message, e);
+ }
Status s = new StatusImpl();
s.setPrevStatusCode(t.getStatus().getStatusCode());
s.setStatusCode(Status.FAILED);
@@ -305,6 +310,8 @@
t.setStatus(s);
fireJobStatusChangeEvent(t, s);
}
+
+ private List contactTran = new ArrayList();
void submitUnbound(Task t) throws NoFreeResourceException {
try {
@@ -312,6 +319,7 @@
return;
}
checkTaskLoadConditions(t);
+ contactTran.clear();
Service[] services = new Service[t.getRequiredServices()];
Contact[] contacts;
Object constraints = getConstraints(t);
@@ -319,6 +327,7 @@
contacts = (Contact[]) constraints;
if (contacts == null) {
contacts = new Contact[] { this.getNextContact(t) };
+ contactTran.add(contacts[0]);
}
}
else {
@@ -326,13 +335,15 @@
for (int i = 0; i < t.getRequiredServices(); i++) {
if (t.getService(i) == null) {
contacts[i] = this.getNextContact(t);
+ contactTran.add(contacts[i]);
}
}
}
-
+
for (int i = 0; i < services.length; i++) {
if (contacts[i] != null && contacts[i].isVirtual()) {
contacts[i] = resolveContact(t, contacts[i]);
+ contactTran.add(contacts[i]);
}
try {
services[i] = t.getService(i);
@@ -357,11 +368,15 @@
((JobSpecification) t.getSpecification()).setAttribute("project", project);
}
}
-
+
submitBoundToServices(t, contacts, services);
logger.debug("No host specified");
}
catch (NoFreeResourceException e) {
+ Iterator i = contactTran.iterator();
+ while (i.hasNext()) {
+ releaseContact((Contact) i.next());
+ }
throw e;
}
catch (Exception e) {
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-08-24 00:50:52 UTC (rev 1722)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-08-24 00:51:42 UTC (rev 1723)
@@ -109,7 +109,7 @@
wh.setDelayedDelta(wh.getDelayedDelta() + factor);
}
- protected double factor(double score, double factor) {
+ protected final double factor(double score, double factor) {
return score + factor;
}
@@ -186,10 +186,13 @@
if (logger.isDebugEnabled()) {
logger.debug("Next contact: " + selected);
}
+
sorted.changeLoad(selected, 1);
selected.setDelayedDelta(successFactor);
return selected.getHost();
}
+
+
public synchronized void releaseContact(Contact contact) {
if (logger.isDebugEnabled()) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-08-28 16:38:47
|
Revision: 1728
http://cogkit.svn.sourceforge.net/cogkit/?rev=1728&view=rev
Author: hategan
Date: 2007-08-28 09:38:34 -0700 (Tue, 28 Aug 2007)
Log Message:
-----------
off as a valid throttle value
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2007-08-24 02:55:34 UTC (rev 1727)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2007-08-28 16:38:34 UTC (rev 1728)
@@ -32,6 +32,8 @@
public abstract class AbstractScheduler extends Thread implements Scheduler {
private static final Logger logger = Logger.getLogger(AbstractScheduler.class);
+
+ public static final int THROTTLE_OFF = 100000000;
private List taskHandlers;
@@ -185,13 +187,22 @@
public void setProperty(String name, Object value) {
if (name.equalsIgnoreCase("maxSimultaneousJobs")) {
logger.debug("Scheduler: setting maxSimultaneousJobs to " + value);
- setMaxSimultaneousJobs(TypeUtil.toInt(value));
+ setMaxSimultaneousJobs(throttleValue(value));
}
else {
throw new IllegalArgumentException("Unsupported property: " + name
+ ". Supported properties are: " + Arrays.asList(this.getPropertyNames()));
}
}
+
+ protected int throttleValue(Object value) {
+ if ("off".equalsIgnoreCase(value.toString())) {
+ return THROTTLE_OFF;
+ }
+ else {
+ return TypeUtil.toInt(value);
+ }
+ }
public Object getProperty(String name) {
return properties.get(name);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-08-24 02:55:34 UTC (rev 1727)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-08-28 16:38:34 UTC (rev 1728)
@@ -538,22 +538,22 @@
public void setProperty(String name, Object value) {
if (name.equalsIgnoreCase(JOBS_PER_CPU)) {
logger.debug("Scheduler: setting jobsPerCpu to " + value);
- jobsPerCPU = TypeUtil.toInt(value);
+ jobsPerCPU = throttleValue(value);
}
else if (name.equalsIgnoreCase(SUBMIT_THROTTLE)) {
- submitQueue.setThrottle(TypeUtil.toInt(value));
+ submitQueue.setThrottle(throttleValue(value));
}
else if (name.equalsIgnoreCase(HOST_SUBMIT_THROTTLE)) {
- submitQueue.setHostThrottle(TypeUtil.toInt(value));
+ submitQueue.setHostThrottle(throttleValue(value));
}
else if (name.equalsIgnoreCase(MAX_TRANSFERS)) {
- maxTransfers = TypeUtil.toInt(value);
+ maxTransfers = throttleValue(value);
}
else if (name.equalsIgnoreCase(SSH_INITIAL_RATE)) {
sshInitialRate = TypeUtil.toInt(value);
}
else if (name.equalsIgnoreCase(MAX_FILE_OPERATIONS)) {
- maxFileOperations = TypeUtil.toInt(value);
+ maxFileOperations = throttleValue(value);
}
else {
super.setProperty(name, value);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-08-24 02:55:34 UTC (rev 1727)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-08-28 16:38:34 UTC (rev 1728)
@@ -298,7 +298,7 @@
}
}
else if (JOB_THROTTLE.equals(name)) {
- jobThrottle = TypeUtil.toInt(value);
+ jobThrottle = throttleValue(value);
}
else {
double val = TypeUtil.toDouble(value);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-02-12 17:35:17
|
Revision: 1896
http://cogkit.svn.sourceforge.net/cogkit/?rev=1896&view=rev
Author: hategan
Date: 2008-02-12 09:35:15 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
added method to access task constraints
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2008-02-12 17:33:40 UTC (rev 1895)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2008-02-12 17:35:15 UTC (rev 1896)
@@ -221,13 +221,13 @@
return combined;
}
- protected void setConstraints(Task task, Object constraint) {
+ public void setConstraints(Task task, Object constraint) {
synchronized (constraints) {
constraints.put(task, constraint);
}
}
- protected Object getConstraints(Task task) {
+ public Object getConstraints(Task task) {
synchronized (constraints) {
return constraints.get(task);
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java 2008-02-12 17:33:40 UTC (rev 1895)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java 2008-02-12 17:35:15 UTC (rev 1896)
@@ -102,4 +102,6 @@
* Allows handling task failures at the scheduler level
*/
void addFailureHandler(FailureHandler handler);
+
+ Object getConstraints(Task task);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-02-12 17:43:48
|
Revision: 1899
http://cogkit.svn.sourceforge.net/cogkit/?rev=1899&view=rev
Author: hategan
Date: 2008-02-12 09:43:46 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
jobThrottle now a float; time it takes to submit a job is now in the score feedback loop. by default it is set to stabilize at 20s submission time
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2008-02-12 17:43:46 UTC (rev 1899)
@@ -197,12 +197,21 @@
protected int throttleValue(Object value) {
if ("off".equalsIgnoreCase(value.toString())) {
- return THROTTLE_OFF;
+ return THROTTLE_OFF;
}
else {
return TypeUtil.toInt(value);
}
}
+
+ protected float floatThrottleValue(Object value) {
+ if ("off".equalsIgnoreCase(value.toString())) {
+ return THROTTLE_OFF;
+ }
+ else {
+ return (float) TypeUtil.toDouble(value);
+ }
+ }
public Object getProperty(String name) {
return properties.get(name);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2008-02-12 17:43:46 UTC (rev 1899)
@@ -20,17 +20,17 @@
private double tscore;
private int load;
private double delayedDelta;
- private int jobThrottle;
+ private float jobThrottle;
- public WeightedHost(BoundContact contact, int jobThrottle) {
+ public WeightedHost(BoundContact contact, float jobThrottle) {
this(contact, 0.0, jobThrottle);
}
- public WeightedHost(BoundContact contact, double score, int jobThrottle) {
+ public WeightedHost(BoundContact contact, double score, float jobThrottle) {
this(contact, score, 0, jobThrottle);
}
- public WeightedHost(BoundContact contact, double score, int load, int jobThrottle) {
+ public WeightedHost(BoundContact contact, double score, int load, float jobThrottle) {
this.host = contact;
setScore(score);
this.load = load;
@@ -129,7 +129,7 @@
return !(load < jobThrottle * tscore + 2);
}
- public int getJobThrottle() {
+ public float getJobThrottle() {
return jobThrottle;
}
}
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-02-12 17:43:46 UTC (rev 1899)
@@ -10,6 +10,7 @@
package org.globus.cog.karajan.scheduler;
import java.lang.reflect.Field;
+import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@@ -41,6 +42,7 @@
public static final String SCORE_HIGH_CAP = "scoreHighCap";
public static final String POLICY = "policy";
public static final String JOB_THROTTLE = "jobThrottle";
+ public static final String MAX_SUBMISSION_TIME = "maxSubmissionTime";
private WeightedHostSet sorted;
private int policy;
@@ -50,10 +52,12 @@
*/
private double connectionRefusedFactor, connectionTimeoutFactor, jobSubmissionTaskLoadFactor,
transferTaskLoadFactor, fileOperationTaskLoadFactor, successFactor, failureFactor,
- scoreHighCap;
+ scoreHighCap, maxSubmissionTime;
- private int jobThrottle;
+ private float jobThrottle;
+ private double submissionTimeBias, submissionTimeFactor;
+
private boolean change;
private TaskConstraints cachedConstraints;
private boolean cachedLoadState;
@@ -74,8 +78,24 @@
failureFactor = -0.5;
scoreHighCap = 100;
jobThrottle = 2;
+ maxSubmissionTime = 20;
+ updateInternal();
}
+ // This isn't very accurate since it varies by provider
+ // and with submission parallelism
+ // What it means is that if submission time is exactly this much
+ // then a success should increase the score by successFactor
+ public final double BASE_SUBMISSION_TIME = 0.5;
+
+ protected void updateInternal() {
+ if (maxSubmissionTime < BASE_SUBMISSION_TIME) {
+ throw new IllegalArgumentException("maxSubmissionTime must be > " + BASE_SUBMISSION_TIME);
+ }
+ submissionTimeFactor = -successFactor / (maxSubmissionTime - BASE_SUBMISSION_TIME);
+ submissionTimeBias = -BASE_SUBMISSION_TIME * submissionTimeFactor;
+ }
+
public void setResources(ContactSet grid) {
super.setResources(grid);
if (grid.getContacts() == null) {
@@ -92,7 +112,7 @@
sorted.add(wh);
}
- protected synchronized void multiplyScore(WeightedHost wh, double factor) {
+ protected synchronized void factorScore(WeightedHost wh, double factor) {
double score = wh.getScore();
if (logger.isDebugEnabled()) {
logger.debug("multiplyScore(" + wh + ", " + factor + ")");
@@ -105,7 +125,7 @@
}
}
- protected synchronized void multiplyScoreLater(WeightedHost wh, double factor) {
+ protected synchronized void factorScoreLater(WeightedHost wh, double factor) {
wh.setDelayedDelta(wh.getDelayedDelta() + factor);
}
@@ -186,13 +206,11 @@
if (logger.isDebugEnabled()) {
logger.debug("Next contact: " + selected);
}
-
+
sorted.changeLoad(selected, 1);
selected.setDelayedDelta(successFactor);
return selected.getHost();
}
-
-
public synchronized void releaseContact(Contact contact) {
if (logger.isDebugEnabled()) {
@@ -263,7 +281,7 @@
private static final String[] myPropertyNames = new String[] { POLICY,
FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
- SCORE_HIGH_CAP, JOB_THROTTLE };
+ SCORE_HIGH_CAP, JOB_THROTTLE, MAX_SUBMISSION_TIME };
private static Set propertyNamesSet;
static {
@@ -298,7 +316,7 @@
}
}
else if (JOB_THROTTLE.equals(name)) {
- jobThrottle = throttleValue(value);
+ jobThrottle = floatThrottleValue(value);
}
else {
double val = TypeUtil.toDouble(value);
@@ -315,6 +333,7 @@
throw new KarajanRuntimeException("Failed to set property '" + name + "'", e);
}
}
+ updateInternal();
}
else {
super.setProperty(name, value);
@@ -337,6 +356,8 @@
return;
}
+ checkSubmissionTime(code, e.getStatus(), t, contacts);
+
if (code == Status.SUBMITTED) {
// this isn't reliable
// factorSubmission(t, contacts, 1);
@@ -368,6 +389,33 @@
}
}
+ public static final String TASK_ATTR_SUBMISSION_TIME = "scheduler:submissionTime";
+
+ private void checkSubmissionTime(int code, Status s, Task t, Contact[] contacts) {
+ synchronized (t) {
+ if (t.getType() == Task.JOB_SUBMISSION) {
+ if (code == Status.SUBMITTING) {
+ t.setAttribute(TASK_ATTR_SUBMISSION_TIME, s.getTime());
+ }
+ else {
+ Date st = (Date) t.getAttribute(TASK_ATTR_SUBMISSION_TIME);
+ if (st != null) {
+ Date st2 = s.getTime();
+ long submissionTime = st2.getTime() - st.getTime();
+ t.setAttribute(TASK_ATTR_SUBMISSION_TIME, null);
+ double delta = submissionTimeBias + submissionTimeFactor * submissionTime
+ / 1000;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Submission time for " + t + ": " + submissionTime
+ + "ms. Score delta: " + delta);
+ }
+ factorMultiple(contacts, delta);
+ }
+ }
+ }
+ }
+ }
+
private void factorSubmission(Task t, Contact[] contacts, int exp) {
// I wonder where the line between abstraction and obfuscation is...
if (t.getType() == Task.JOB_SUBMISSION) {
@@ -398,7 +446,7 @@
BoundContact bc = (BoundContact) contacts[i];
WeightedHost wh = sorted.findHost(bc);
if (wh != null) {
- multiplyScore(wh, factor);
+ factorScore(wh, factor);
}
}
}
@@ -408,7 +456,7 @@
BoundContact bc = (BoundContact) contacts[i];
WeightedHost wh = sorted.findHost(bc);
if (wh != null) {
- multiplyScoreLater(wh, factor);
+ factorScoreLater(wh, factor);
}
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-05-08 03:50:24
|
Revision: 2003
http://cogkit.svn.sourceforge.net/cogkit/?rev=2003&view=rev
Author: hategan
Date: 2008-05-07 20:50:15 -0700 (Wed, 07 May 2008)
Log Message:
-----------
make canceling of tasks non blocking
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/NonBlockingCancel.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2008-05-08 03:49:11 UTC (rev 2002)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2008-05-08 03:50:15 UTC (rev 2003)
@@ -34,6 +34,7 @@
import org.globus.cog.karajan.scheduler.submitQueue.GlobalSubmitQueue;
import org.globus.cog.karajan.scheduler.submitQueue.HostSubmitQueue;
import org.globus.cog.karajan.scheduler.submitQueue.InstanceSubmitQueue;
+import org.globus.cog.karajan.scheduler.submitQueue.NonBlockingCancel;
import org.globus.cog.karajan.scheduler.submitQueue.NonBlockingSubmit;
import org.globus.cog.karajan.scheduler.submitQueue.SubmitQueue;
import org.globus.cog.karajan.util.BoundContact;
@@ -650,13 +651,7 @@
public void cancelTask(Task task) {
TaskHandler handler = getHandler(task);
if (handler != null) {
- try {
- handler.cancel(task);
- }
- catch (Exception e) {
- // force it
- task.setStatus(Status.CANCELED);
- }
+ new NonBlockingCancel(handler, task).go();
}
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/NonBlockingCancel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/NonBlockingCancel.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/NonBlockingCancel.java 2008-05-08 03:50:15 UTC (rev 2003)
@@ -0,0 +1,40 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on May 7, 2008
+ */
+package org.globus.cog.karajan.scheduler.submitQueue;
+
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.cog.abstraction.interfaces.TaskHandler;
+
+public class NonBlockingCancel extends NonBlockingSubmit {
+
+ public NonBlockingCancel(TaskHandler th, Task task) {
+ super(th, task, null);
+ }
+
+ public void run() {
+ try {
+ getTaskHandler().cancel(getTask());
+ notifyPreviousQueue(null);
+ }
+ catch (Exception e) {
+ //force it
+ getTask().setStatus(Status.CANCELED);
+ notifyPreviousQueue(e);
+ }
+ catch (ThreadDeath td) {
+ throw td;
+ }
+ catch (Throwable t) {
+ notifyPreviousQueue(new Exception(t));
+ t.printStackTrace();
+ }
+ }
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <b_...@us...> - 2008-06-25 22:47:59
|
Revision: 2058
http://cogkit.svn.sourceforge.net/cogkit/?rev=2058&view=rev
Author: b_z_c
Date: 2008-06-25 15:47:10 -0700 (Wed, 25 Jun 2008)
Log Message:
-----------
hopefully better behaviour in the presence of bad sites
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2008-06-25 13:14:02 UTC (rev 2057)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2008-06-25 22:47:10 UTC (rev 2058)
@@ -11,6 +11,7 @@
import java.text.DecimalFormat;
import java.text.NumberFormat;
+import org.apache.log4j.Logger;
import org.globus.cog.karajan.util.BoundContact;
@@ -18,12 +19,15 @@
static final int MINWEIGHT = -10;
+ private static final Logger logger = Logger.getLogger(WeightedHost.class);
+
private BoundContact host;
private Double score;
private double tscore;
private int load;
private double delayedDelta;
private float jobThrottle;
+ private long lastUsed;
public WeightedHost(BoundContact contact, float jobThrottle) {
this(contact, 0.0, jobThrottle);
@@ -63,7 +67,8 @@
}
public final double getTScore() {
- return tscore;
+ if(tscore >= 1) return tscore;
+ if(isOverloaded()) return 0; else return 1;
}
public final BoundContact getHost() {
@@ -130,7 +135,25 @@
}
public boolean isOverloaded() {
- return !(load < maxLoad());
+ double ml = maxLoad();
+ if(tscore >= 1) {
+ // the site is mostly good. permit 1 or more jobs
+ // always.
+ logger.info("In load mode. score = "+score+" tscore = "+tscore+", maxload="+ml);
+ return !(load <= ml);
+ } else {
+ // the site is mostly bad. allow either 1 or 0 jobs
+ // based on time.
+ long now = System.currentTimeMillis();
+ long delay = now - lastUsed;
+ long permittedDelay = (long)(Math.exp(-(score.doubleValue()))*100.0);
+ boolean overloaded=(delay<permittedDelay);
+ // tscore of -1 will give delay of around
+ // 200ms, and will double every time tscore goes
+ // down by one (which is once per failed job? roughly?)
+ logger.info("In delay mode. score = "+score+" tscore = "+tscore+", maxload="+ml+" delay since last used="+delay+"ms"+" permitted delay="+permittedDelay+"ms overloaded="+overloaded);
+ return overloaded;
+ }
}
public float getJobThrottle() {
@@ -138,6 +161,10 @@
}
public double maxLoad() {
- return jobThrottle * tscore + 2;
+ return jobThrottle * tscore + 1;
}
+
+ public void notifyUsed() {
+ lastUsed = System.currentTimeMillis();
+ }
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-06-25 13:14:02 UTC (rev 2057)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-06-25 22:47:10 UTC (rev 2058)
@@ -223,6 +223,7 @@
sorted.changeLoad(selected, 1);
selected.setDelayedDelta(successFactor);
+ selected.notifyUsed();
return selected.getHost();
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <b_...@us...> - 2008-06-30 16:08:19
|
Revision: 2065
http://cogkit.svn.sourceforge.net/cogkit/?rev=2065&view=rev
Author: b_z_c
Date: 2008-06-30 09:08:01 -0700 (Mon, 30 Jun 2008)
Log Message:
-----------
overload-chk2 patch from Mihael; fixes a problem introduced in r2058
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2008-06-30 15:05:39 UTC (rev 2064)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2008-06-30 16:08:01 UTC (rev 2065)
@@ -11,8 +11,9 @@
import java.text.DecimalFormat;
import java.text.NumberFormat;
+import java.util.Timer;
+
import org.apache.log4j.Logger;
-
import org.globus.cog.karajan.util.BoundContact;
public class WeightedHost implements Comparable {
@@ -70,7 +71,7 @@
public final double getTScore() {
if (tscore >= 1)
return tscore;
- if (isOverloaded())
+ if (isOverloaded() != 0)
return 0;
else
return 1;
@@ -116,7 +117,7 @@
public String toString() {
return host.toString() + ":" + D4.format(score) + "(" + D4.format(tscore) + "):" + load
- + "/" + (int) (maxLoad()) + (isOverloaded() ? " overloaded" : "");
+ + "/" + (int) (maxLoad()) + " overload: " + isOverloaded();
}
public int compareTo(Object o) {
@@ -139,7 +140,7 @@
this.delayedDelta = delayedDelta;
}
- public boolean isOverloaded() {
+ public int isOverloaded() {
double ml = maxLoad();
if (tscore >= 1) {
// the site is mostly good. permit 1 or more jobs
@@ -148,7 +149,7 @@
logger.debug("In load mode. score = " + score + " tscore = " + tscore + ", maxload="
+ ml);
}
- return !(load <= ml);
+ return load <= ml ? 0 : 1;
}
else {
// the site is mostly bad. allow either 1 or 0 jobs
@@ -165,7 +166,7 @@
+ ", maxload=" + ml + " delay since last used=" + delay + "ms"
+ " permitted delay=" + permittedDelay + "ms overloaded=" + overloaded);
}
- return overloaded;
+ return (int) (delay - permittedDelay);
}
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-06-30 15:05:39 UTC (rev 2064)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-06-30 16:08:01 UTC (rev 2065)
@@ -274,7 +274,7 @@
Iterator i = s.iterator();
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
- if (!wh.isOverloaded()) {
+ if (wh.isOverloaded() == 0) {
ns.add(wh);
}
}
@@ -284,7 +284,7 @@
Iterator i = s.iterator();
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
- if (wh.isOverloaded()) {
+ if (wh.isOverloaded() != 0) {
i.remove();
}
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2008-06-30 15:05:39 UTC (rev 2064)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2008-06-30 16:08:01 UTC (rev 2065)
@@ -12,6 +12,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.TreeSet;
import org.globus.cog.karajan.util.BoundContact;
@@ -21,8 +23,10 @@
private Map weightedHosts;
private double sum;
private double scoreHighCap;
- private int overloadedCount;
+ private volatile int overloadedCount;
+ private static final Timer timer = new Timer();
+
public WeightedHostSet(double scoreHighCap) {
init();
this.scoreHighCap = scoreHighCap;
@@ -38,34 +42,54 @@
scores.add(wh);
weightedHosts.put(wh.getHost(), wh);
sum += wh.getTScore();
- overloadedCount += wh.isOverloaded() ? 1 : 0;
+ overloadedCount += checkOverloaded(wh, 1);
}
public void changeScore(WeightedHost wh, double newScore) {
scores.remove(wh);
sum -= wh.getTScore();
- overloadedCount -= wh.isOverloaded() ? 1 : 0;
+ overloadedCount += checkOverloaded(wh, -1);
wh.setScore(newScore);
weightedHosts.put(wh.getHost(), wh);
scores.add(wh);
sum += wh.getTScore();
- overloadedCount += wh.isOverloaded() ? 1 : 0;
+ overloadedCount += checkOverloaded(wh, 1);
}
public void changeLoad(WeightedHost wh, int dl) {
- overloadedCount -= wh.isOverloaded() ? 1 : 0;
+ overloadedCount -= checkOverloaded(wh, -1);
wh.changeLoad(dl);
- overloadedCount += wh.isOverloaded() ? 1 : 0;
+ overloadedCount += checkOverloaded(wh, 1);
}
public double remove(WeightedHost wh) {
scores.remove(wh);
weightedHosts.remove(wh.getHost());
sum -= wh.getScore();
- overloadedCount -= wh.isOverloaded() ? 1 : 0;
+ overloadedCount += checkOverloaded(wh, -1);
return wh.getScore();
}
+ private int checkOverloaded(WeightedHost wh, final int dir) {
+ int v = wh.isOverloaded();
+ if (v >= 0) {
+ if (dir > 0) {
+ return v;
+ }
+ else {
+ return -v;
+ }
+ }
+ else {
+ timer.schedule(new TimerTask() {
+ public void run() {
+ overloadedCount -= dir;
+ }
+ }, -v);
+ return dir;
+ }
+ }
+
public WeightedHost findHost(BoundContact bc) {
return (WeightedHost) weightedHosts.get(bc);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|