|
From: <ha...@us...> - 2006-11-06 19:52:41
|
Revision: 1340
http://svn.sourceforge.net/cogkit/?rev=1340&view=rev
Author: hategan
Date: 2006-11-06 11:52:31 -0800 (Mon, 06 Nov 2006)
Log Message:
-----------
added resource constraints
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -21,6 +21,7 @@
import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.Contact;
import org.globus.cog.karajan.util.ContactSet;
import org.globus.cog.karajan.util.Queue;
@@ -48,6 +49,8 @@
private final Map constraints;
private List taskTransformers, failureHandlers;
+
+ private ResourceConstraintChecker constraintChecker;
public AbstractScheduler() {
super("Scheduler");
@@ -240,4 +243,30 @@
public void addFailureHandler(FailureHandler handler) {
failureHandlers.add(handler);
}
+
+ public ResourceConstraintChecker getConstraintChecker() {
+ return constraintChecker;
+ }
+
+ public void setConstraintChecker(ResourceConstraintChecker constraintChecker) {
+ this.constraintChecker = constraintChecker;
+ }
+
+ protected boolean checkConstraints(BoundContact resource, TaskConstraints tc) {
+ if (constraintChecker == null) {
+ return true;
+ }
+ else {
+ return constraintChecker.checkConstraints(resource, tc);
+ }
+ }
+
+ protected List checkConstraints(List resources, TaskConstraints tc) {
+ if (constraintChecker == null) {
+ return resources;
+ }
+ else {
+ return constraintChecker.checkConstraints(resources, tc);
+ }
+ }
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -0,0 +1,39 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Nov 3, 2006
+ */
+package org.globus.cog.karajan.scheduler;
+
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.BoundContact;
+
+public class ContactAllocationTask extends TaskImpl {
+ private BoundContact contact;
+ private VariableStack stack;
+
+ public BoundContact getContact() {
+ return contact;
+ }
+
+ public void setContact(BoundContact contact) {
+ this.contact = contact;
+ }
+
+ public VariableStack getStack() {
+ return stack;
+ }
+
+ public void setStack(VariableStack stack) {
+ this.stack = stack;
+ }
+
+ public int getRequiredServices() {
+ return 1;
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -13,6 +13,7 @@
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.Contact;
+import org.globus.cog.karajan.util.ContactSet;
import org.globus.cog.karajan.util.TypeUtil;
public class DefaultScheduler extends LateBindingScheduler implements Scheduler, Runnable {
@@ -63,7 +64,9 @@
throws NoFreeResourceException {
checkGlobalLoadConditions();
int initial = contactCursor;
- while (!checkLoad(getResources().get(contactCursor))) {
+ ContactSet resources = getResources();
+ while (!checkLoad(resources.get(contactCursor))
+ || !checkConstraints(resources.get(contactCursor), t)) {
incContactCursor();
if (contactCursor == initial) {
logger.debug("No free resources");
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -423,6 +423,15 @@
public void submitBoundToServices(Task t, Contact[] contacts, Service[] services)
throws TaskSubmissionException {
+ if (t instanceof ContactAllocationTask) {
+ ((ContactAllocationTask) t).setContact((BoundContact) contacts[0]);
+ Status status = t.getStatus();
+ status.setPrevStatusCode(status.getStatusCode());
+ status.setStatusCode(Status.COMPLETED);
+ StatusEvent se = new StatusEvent(t, status);
+ fireJobStatusChangeEvent(se);
+ return;
+ }
applyTaskTransformers(t, contacts, services);
t.addStatusListener(this);
TaskHandler handler;
@@ -607,5 +616,4 @@
protected Contact[] getContacts(Task t) {
return (Contact[]) taskContacts.get(t);
}
-
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -0,0 +1,21 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 18, 2006
+ */
+package org.globus.cog.karajan.scheduler;
+
+import java.util.List;
+
+import org.globus.cog.karajan.util.BoundContact;
+
+//I'm terrible with naming classes
+public interface ResourceConstraintChecker {
+ boolean checkConstraints(BoundContact resource, TaskConstraints tc);
+
+ List checkConstraints(List resources, TaskConstraints tc);
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -11,7 +11,7 @@
import org.globus.cog.karajan.util.BoundContact;
-public class WeightedHost {
+public class WeightedHost implements Comparable {
private BoundContact host;
private Double score;
@@ -51,4 +51,16 @@
public String toString() {
return host.toString();
}
+
+ public int compareTo(Object o) {
+ WeightedHost other = (WeightedHost) o;
+ int r = score.compareTo(other.score);
+ if (r == 0) {
+ //arbitrary ordering on the contact
+ return System.identityHashCode(host) - System.identityHashCode(other.host);
+ }
+ else {
+ return r;
+ }
+ }
}
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -38,11 +38,10 @@
public static final String FACTOR_FAILURE = "failureFactor";
public static final String SCORE_HIGH_CAP = "scoreHighCap";
public static final String SCORE_LOW_CAP = "scoreLowCap";
- public static final String RENORMALIZATION_DELAY = "renormalizationDelay";
+ public static final String NORMALIZATION_DELAY = "normalizationDelay";
public static final String POLICY = "policy";
private WeightedHostSet sorted;
- private double sum;
private int policy;
private int delay;
@@ -61,7 +60,7 @@
setFactor(FACTOR_FAILURE, 0.9);
setFactor(SCORE_HIGH_CAP, 100);
setFactor(SCORE_LOW_CAP, 0.001);
- setFactor(RENORMALIZATION_DELAY, 100);
+ setFactor(NORMALIZATION_DELAY, 100);
}
protected final void setFactor(String name, double value) {
@@ -83,21 +82,18 @@
protected void addToSorted(WeightedHost wh) {
sorted.add(wh);
- sum += wh.getScore();
}
protected synchronized void multiplyScore(WeightedHost wh, double factor) {
- double score = sorted.remove(wh);
+ double score = wh.getScore();
if (logger.isDebugEnabled()) {
logger.debug("multiplyScore(" + wh + ", " + factor + ")");
}
- sum -= score;
- WeightedHost nwh = new WeightedHost(wh.getHost(), checkCaps(score * factor));
+ double ns = checkCaps(score * factor);
+ sorted.changeScore(wh, ns);
if (logger.isDebugEnabled()) {
- logger.debug("Old score: " + score + ", new score: " + nwh.getScore());
+ logger.debug("Old score: " + score + ", new score: " + ns);
}
- sorted.add(nwh);
- sum += nwh.getScore();
}
protected double checkCaps(double score) {
@@ -112,17 +108,24 @@
}
}
- protected synchronized BoundContact getNextContact(TaskConstraints t) throws NoFreeResourceException {
+ protected synchronized BoundContact getNextContact(TaskConstraints t)
+ throws NoFreeResourceException {
checkGlobalLoadConditions();
BoundContact contact;
+
+ WeightedHostSet s = sorted;
+
+ s = constrain(s, getConstraintChecker(), t);
+
+ double sum = s.getSum();
if (policy == POLICY_WEIGHTED_RANDOM) {
double rand = Math.random() * sum;
if (logger.isDebugEnabled()) {
- logger.debug("Sorted: " + sorted);
+ logger.debug("Sorted: " + s);
logger.debug("Rand: " + rand + ", sum: " + sum);
}
- Iterator i = sorted.iterator();
- double sum = 0;
+ Iterator i = s.iterator();
+
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
sum += wh.getScore();
@@ -130,12 +133,12 @@
return wh.getHost();
}
}
- renormalize();
- contact = sorted.last().getHost();
+ normalize();
+ contact = s.last().getHost();
}
else if (policy == POLICY_BEST_SCORE) {
- renormalize();
- contact = sorted.last().getHost();
+ normalize();
+ contact = s.last().getHost();
}
else {
throw new KarajanRuntimeException("Invalid policy number: " + policy);
@@ -146,30 +149,32 @@
return contact;
}
- protected void renormalize() {
+ protected WeightedHostSet constrain(WeightedHostSet s, ResourceConstraintChecker rcc, TaskConstraints tc) {
+ if (rcc == null) {
+ return s;
+ }
+ else {
+ WeightedHostSet ns = new WeightedHostSet();
+ Iterator i = s.iterator();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ if (rcc.checkConstraints(wh.getHost(), tc)) {
+ ns.add(wh);
+ }
+ }
+ return ns;
+ }
+ }
+
+ protected void normalize() {
delay++;
- if (delay > getFactor(RENORMALIZATION_DELAY)) {
+ if (delay > getFactor(NORMALIZATION_DELAY)) {
if (logger.isDebugEnabled()) {
- logger.debug("Renormalizing...");
+ logger.debug("Normalizing...");
logger.debug("Before normalization: " + sorted);
}
delay = 0;
- double prod = 1;
- Iterator i = sorted.iterator();
- while (i.hasNext()) {
- WeightedHost wh = (WeightedHost) i.next();
- prod *= wh.getScore();
- }
- double geomAvg = Math.pow(prod, 1.0 / sorted.size());
- double renormalizationFactor = 1 / geomAvg;
- i = sorted.iterator();
- sorted = new WeightedHostSet();
- while (i.hasNext()) {
- WeightedHost wh = (WeightedHost) i.next();
- WeightedHost nwh = new WeightedHost(wh.getHost(), checkCaps(wh.getScore()
- * renormalizationFactor));
- sorted.add(nwh);
- }
+ sorted.normalize(1);
if (logger.isDebugEnabled()) {
logger.debug("After normalization: " + sorted);
}
@@ -180,7 +185,7 @@
private static final String[] myPropertyNames = new String[] { POLICY,
FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
- SCORE_HIGH_CAP, SCORE_LOW_CAP, RENORMALIZATION_DELAY };
+ SCORE_HIGH_CAP, SCORE_LOW_CAP, NORMALIZATION_DELAY };
private static Set propertyNamesSet;
static {
@@ -236,10 +241,10 @@
Exception ex = e.getStatus().getException();
if (ex != null) {
String exs = ex.toString();
- if (exs.indexOf("Connection refused") > 0 || exs.indexOf("connection refused") > 0) {
+ if (exs.indexOf("Connection refused") >= 0 || exs.indexOf("connection refused") >= 0) {
factorMultiple(contacts, getFactor(FACTOR_CONNECTION_REFUSED));
}
- else if (exs.indexOf("timeout") > 0) {
+ else if (exs.indexOf("timeout") >= 0) {
factorMultiple(contacts, getFactor(FACTOR_CONNECTION_TIMEOUT));
}
}
@@ -250,15 +255,27 @@
private void factorSubmission(Task t, Contact[] contacts, int exp) {
// I wonder where the line between abstraction and obfuscation is...
if (t.getType() == Task.JOB_SUBMISSION) {
- factorMultiple(contacts, Math.pow(getFactor(FACTOR_SUBMISSION_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(getFactor(FACTOR_SUBMISSION_TASK_LOAD), exp));
}
else if (t.getType() == Task.FILE_TRANSFER) {
- factorMultiple(contacts, Math.pow(getFactor(FACTOR_TRANSFER_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(getFactor(FACTOR_TRANSFER_TASK_LOAD), exp));
}
else if (t.getType() == Task.FILE_OPERATION) {
- factorMultiple(contacts, Math.pow(getFactor(FACTOR_FILEOP_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(getFactor(FACTOR_FILEOP_TASK_LOAD), exp));
}
}
+
+ private double spow(double x, int exp) {
+ if (exp == 1) {
+ return x;
+ }
+ else if (exp == -1) {
+ return 1/x;
+ }
+ else {
+ throw new IllegalArgumentException();
+ }
+ }
private void factorMultiple(Contact[] contacts, double factor) {
for (int i = 0; i < contacts.length; i++) {
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java 2006-11-06 19:52:31 UTC (rev 1340)
@@ -9,108 +9,75 @@
*/
package org.globus.cog.karajan.scheduler;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.Map.Entry;
+import java.util.Iterator;
+import java.util.TreeSet;
public class WeightedHostSet {
- private TreeMap map;
- private HashMap scores;
+ private TreeSet scores;
+ private double sum;
public WeightedHostSet() {
- map = new TreeMap();
- scores = new HashMap();
+ init();
}
+ protected void init() {
+ scores = new TreeSet();
+ sum = 0;
+ }
+
public synchronized void add(WeightedHost wh) {
- Double score = wh.getScoreAsDouble();
- Set s = (Set) map.get(score);
- if (s == null) {
- s = new HashSet();
- map.put(score, s);
- }
-
- s.add(wh);
- scores.put(wh, score);
+ scores.add(wh);
+ sum += wh.getScore();
}
+ public synchronized void changeScore(WeightedHost wh, double newScore) {
+ scores.remove(wh);
+ sum -= wh.getScore();
+ scores.add(new WeightedHost(wh.getHost(), newScore));
+ sum += newScore;
+ }
+
public synchronized double remove(WeightedHost wh) {
- Double score = (Double) scores.get(wh);
- Set s = (Set) map.get(score);
- if (s != null) {
- s.remove(wh);
- if (s.isEmpty()) {
- map.remove(score);
- }
- scores.remove(wh);
- }
- return score.doubleValue();
+ scores.remove(wh);
+ return wh.getScore();
}
-
- public java.util.Iterator iterator() {
- return new Iterator();
+
+ public Iterator iterator() {
+ return scores.iterator();
}
-
+
public WeightedHost last() {
- Set s = (Set) map.get(map.lastKey());
- return (WeightedHost) s.iterator().next();
+ return (WeightedHost) scores.last();
}
-
+
public int size() {
return scores.size();
}
-
- public String toString() {
- return map.toString();
+
+ public double getSum() {
+ return sum;
}
-
- private class Iterator implements java.util.Iterator{
- private final java.util.Iterator mapi;
- private java.util.Iterator seti;
- private WeightedHost lasth;
- private Set lasts;
-
-
- public Iterator() {
- mapi = map.entrySet().iterator();
- if (mapi.hasNext()) {
- Entry next = (Entry) mapi.next();
- lasts = (Set) next.getValue();
- seti = lasts.iterator();
- }
- }
- public boolean hasNext() {
- return seti.hasNext() || mapi.hasNext();
+ protected synchronized void normalize(double target) {
+ double prod = 1;
+ Iterator i = scores.iterator();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ prod *= wh.getScore();
}
-
- public Object next() {
- if (seti.hasNext()) {
- lasth = (WeightedHost) seti.next();
- return lasth;
- }
- else if (mapi.hasNext()) {
- Entry next = (Entry) mapi.next();
- lasts = (Set) next.getValue();
- seti = lasts.iterator();
- lasth = (WeightedHost) seti.next();
- return lasth;
- }
- else {
- throw new NoSuchElementException();
- }
+ double geomAvg = Math.pow(prod, target / scores.size());
+ double renormalizationFactor = 1 / geomAvg;
+ i = scores.iterator();
+ scores = new TreeSet();
+ while (i.hasNext()) {
+ WeightedHost wh = (WeightedHost) i.next();
+ WeightedHost nwh = new WeightedHost(wh.getHost(), wh.getScore()
+ * renormalizationFactor);
+ add(nwh);
}
-
- public void remove() {
- seti.remove();
- if (lasts.isEmpty()) {
- mapi.remove();
- }
- scores.remove(lasth);
- }
}
+ public String toString() {
+ return scores.toString();
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|