| 
     
      
      
      From: <ha...@us...> - 2006-11-06 19:52:41
       
   | 
Revision: 1340
          http://svn.sourceforge.net/cogkit/?rev=1340&view=rev
Author:   hategan
Date:     2006-11-06 11:52:31 -0800 (Mon, 06 Nov 2006)
Log Message:
-----------
added resource constraints
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
Added Paths:
-----------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2006-11-06 19:52:31 UTC (rev 1340)
@@ -21,6 +21,7 @@
 import org.globus.cog.abstraction.interfaces.StatusListener;
 import org.globus.cog.abstraction.interfaces.Task;
 import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.karajan.util.BoundContact;
 import org.globus.cog.karajan.util.Contact;
 import org.globus.cog.karajan.util.ContactSet;
 import org.globus.cog.karajan.util.Queue;
@@ -48,6 +49,8 @@
 	private final Map constraints;
 
 	private List taskTransformers, failureHandlers;
+	
+	private ResourceConstraintChecker constraintChecker;
 
 	public AbstractScheduler() {
 		super("Scheduler");
@@ -240,4 +243,30 @@
 	public void addFailureHandler(FailureHandler handler) {
 		failureHandlers.add(handler);
 	}
+
+	public ResourceConstraintChecker getConstraintChecker() {
+		return constraintChecker;
+	}
+
+	public void setConstraintChecker(ResourceConstraintChecker constraintChecker) {
+		this.constraintChecker = constraintChecker;
+	}
+	
+	protected boolean checkConstraints(BoundContact resource, TaskConstraints tc) {
+		if (constraintChecker == null) {
+			return true;
+		}
+		else {
+			return constraintChecker.checkConstraints(resource, tc);
+		}
+	}
+	
+	protected List checkConstraints(List resources, TaskConstraints tc) {
+		if (constraintChecker == null) {
+			return resources;
+		}
+		else {
+			return constraintChecker.checkConstraints(resources, tc);
+		}
+	}
 }
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java	                        (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java	2006-11-06 19:52:31 UTC (rev 1340)
@@ -0,0 +1,39 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Nov 3, 2006
+ */
+package org.globus.cog.karajan.scheduler;
+
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.karajan.stack.VariableStack;
+import org.globus.cog.karajan.util.BoundContact;
+
+public class ContactAllocationTask extends TaskImpl {
+	private BoundContact contact;
+	private VariableStack stack;
+
+	public BoundContact getContact() {
+		return contact;
+	}
+
+	public void setContact(BoundContact contact) {
+		this.contact = contact;
+	}
+
+	public VariableStack getStack() {
+		return stack;
+	}
+
+	public void setStack(VariableStack stack) {
+		this.stack = stack;
+	}
+
+	public int getRequiredServices() {
+		return 1;
+	}
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java	2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java	2006-11-06 19:52:31 UTC (rev 1340)
@@ -13,6 +13,7 @@
 import org.globus.cog.abstraction.interfaces.Task;
 import org.globus.cog.karajan.util.BoundContact;
 import org.globus.cog.karajan.util.Contact;
+import org.globus.cog.karajan.util.ContactSet;
 import org.globus.cog.karajan.util.TypeUtil;
 
 public class DefaultScheduler extends LateBindingScheduler implements Scheduler, Runnable {
@@ -63,7 +64,9 @@
 			throws NoFreeResourceException {
 		checkGlobalLoadConditions();
 		int initial = contactCursor;
-		while (!checkLoad(getResources().get(contactCursor))) {
+		ContactSet resources = getResources();
+		while (!checkLoad(resources.get(contactCursor))
+				|| !checkConstraints(resources.get(contactCursor), t)) {
 			incContactCursor();
 			if (contactCursor == initial) {
 				logger.debug("No free resources");
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2006-11-06 19:52:31 UTC (rev 1340)
@@ -423,6 +423,15 @@
 
 	public void submitBoundToServices(Task t, Contact[] contacts, Service[] services)
 			throws TaskSubmissionException {
+		if (t instanceof ContactAllocationTask) {
+			((ContactAllocationTask) t).setContact((BoundContact) contacts[0]);
+			Status status = t.getStatus();
+			status.setPrevStatusCode(status.getStatusCode());
+			status.setStatusCode(Status.COMPLETED);
+			StatusEvent se = new StatusEvent(t, status);
+			fireJobStatusChangeEvent(se);
+			return;
+		}
 		applyTaskTransformers(t, contacts, services);
 		t.addStatusListener(this);
 		TaskHandler handler;
@@ -607,5 +616,4 @@
 	protected Contact[] getContacts(Task t) {
 		return (Contact[]) taskContacts.get(t);
 	}
-
 }
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java	                        (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ResourceConstraintChecker.java	2006-11-06 19:52:31 UTC (rev 1340)
@@ -0,0 +1,21 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Oct 18, 2006
+ */
+package org.globus.cog.karajan.scheduler;
+
+import java.util.List;
+
+import org.globus.cog.karajan.util.BoundContact;
+
+//I'm terrible with naming classes
+public interface ResourceConstraintChecker {
+	boolean checkConstraints(BoundContact resource, TaskConstraints tc);
+	
+	List checkConstraints(List resources, TaskConstraints tc);
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2006-11-06 19:52:31 UTC (rev 1340)
@@ -11,7 +11,7 @@
 
 import org.globus.cog.karajan.util.BoundContact;
 
-public class WeightedHost {
+public class WeightedHost implements Comparable {
 	private BoundContact host;
 	private Double score;
 
@@ -51,4 +51,16 @@
 	public String toString() {
 		return host.toString();
 	}
+
+	public int compareTo(Object o) {
+		WeightedHost other = (WeightedHost) o;
+		int r = score.compareTo(other.score);
+		if (r == 0) {
+			//arbitrary ordering on the contact
+			return System.identityHashCode(host) - System.identityHashCode(other.host);
+		}
+		else {
+			return r;
+		}
+	}
 }
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2006-11-06 19:52:31 UTC (rev 1340)
@@ -38,11 +38,10 @@
 	public static final String FACTOR_FAILURE = "failureFactor";
 	public static final String SCORE_HIGH_CAP = "scoreHighCap";
 	public static final String SCORE_LOW_CAP = "scoreLowCap";
-	public static final String RENORMALIZATION_DELAY = "renormalizationDelay";
+	public static final String NORMALIZATION_DELAY = "normalizationDelay";
 	public static final String POLICY = "policy";
 
 	private WeightedHostSet sorted;
-	private double sum;
 	private int policy;
 	private int delay;
 
@@ -61,7 +60,7 @@
 		setFactor(FACTOR_FAILURE, 0.9);
 		setFactor(SCORE_HIGH_CAP, 100);
 		setFactor(SCORE_LOW_CAP, 0.001);
-		setFactor(RENORMALIZATION_DELAY, 100);
+		setFactor(NORMALIZATION_DELAY, 100);
 	}
 
 	protected final void setFactor(String name, double value) {
@@ -83,21 +82,18 @@
 
 	protected void addToSorted(WeightedHost wh) {
 		sorted.add(wh);
-		sum += wh.getScore();
 	}
 
 	protected synchronized void multiplyScore(WeightedHost wh, double factor) {
-		double score = sorted.remove(wh);
+		double score = wh.getScore();
 		if (logger.isDebugEnabled()) {
 			logger.debug("multiplyScore(" + wh + ", " + factor + ")");
 		}
-		sum -= score;
-		WeightedHost nwh = new WeightedHost(wh.getHost(), checkCaps(score * factor));
+		double ns = checkCaps(score * factor);
+		sorted.changeScore(wh, ns);
 		if (logger.isDebugEnabled()) {
-			logger.debug("Old score: " + score + ", new score: " + nwh.getScore());
+			logger.debug("Old score: " + score + ", new score: " + ns);
 		}
-		sorted.add(nwh);
-		sum += nwh.getScore();
 	}
 
 	protected double checkCaps(double score) {
@@ -112,17 +108,24 @@
 		}
 	}
 
-	protected synchronized BoundContact getNextContact(TaskConstraints t) throws NoFreeResourceException {
+	protected synchronized BoundContact getNextContact(TaskConstraints t)
+			throws NoFreeResourceException {
 		checkGlobalLoadConditions();
 		BoundContact contact;
+		
+		WeightedHostSet s = sorted;
+		
+		s = constrain(s, getConstraintChecker(), t);
+		
+		double sum = s.getSum();
 		if (policy == POLICY_WEIGHTED_RANDOM) {
 			double rand = Math.random() * sum;
 			if (logger.isDebugEnabled()) {
-				logger.debug("Sorted: " + sorted);
+				logger.debug("Sorted: " + s);
 				logger.debug("Rand: " + rand + ", sum: " + sum);
 			}
-			Iterator i = sorted.iterator();
-			double sum = 0;
+			Iterator i = s.iterator();
+			
 			while (i.hasNext()) {
 				WeightedHost wh = (WeightedHost) i.next();
 				sum += wh.getScore();
@@ -130,12 +133,12 @@
 					return wh.getHost();
 				}
 			}
-			renormalize();
-			contact = sorted.last().getHost();
+			normalize();
+			contact = s.last().getHost();
 		}
 		else if (policy == POLICY_BEST_SCORE) {
-			renormalize();
-			contact = sorted.last().getHost();
+			normalize();
+			contact = s.last().getHost();
 		}
 		else {
 			throw new KarajanRuntimeException("Invalid policy number: " + policy);
@@ -146,30 +149,32 @@
 		return contact;
 	}
 
-	protected void renormalize() {
+	protected WeightedHostSet constrain(WeightedHostSet s, ResourceConstraintChecker rcc, TaskConstraints tc) {
+		if (rcc == null) {
+			return s;
+		}
+		else {
+			WeightedHostSet ns = new WeightedHostSet();
+			Iterator i = s.iterator();
+			while (i.hasNext()) {
+				WeightedHost wh = (WeightedHost) i.next();
+				if (rcc.checkConstraints(wh.getHost(), tc)) {
+					ns.add(wh);
+				}
+			}
+			return ns;
+		}
+	}
+
+	protected void normalize() {
 		delay++;
-		if (delay > getFactor(RENORMALIZATION_DELAY)) {
+		if (delay > getFactor(NORMALIZATION_DELAY)) {
 			if (logger.isDebugEnabled()) {
-				logger.debug("Renormalizing...");
+				logger.debug("Normalizing...");
 				logger.debug("Before normalization: " + sorted);
 			}
 			delay = 0;
-			double prod = 1;
-			Iterator i = sorted.iterator();
-			while (i.hasNext()) {
-				WeightedHost wh = (WeightedHost) i.next();
-				prod *= wh.getScore();
-			}
-			double geomAvg = Math.pow(prod, 1.0 / sorted.size());
-			double renormalizationFactor = 1 / geomAvg;
-			i = sorted.iterator();
-			sorted = new WeightedHostSet();
-			while (i.hasNext()) {
-				WeightedHost wh = (WeightedHost) i.next();
-				WeightedHost nwh = new WeightedHost(wh.getHost(), checkCaps(wh.getScore()
-						* renormalizationFactor));
-				sorted.add(nwh);
-			}
+			sorted.normalize(1);
 			if (logger.isDebugEnabled()) {
 				logger.debug("After normalization: " + sorted);
 			}
@@ -180,7 +185,7 @@
 	private static final String[] myPropertyNames = new String[] { POLICY,
 			FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
 			FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
-			SCORE_HIGH_CAP, SCORE_LOW_CAP, RENORMALIZATION_DELAY };
+			SCORE_HIGH_CAP, SCORE_LOW_CAP, NORMALIZATION_DELAY };
 	private static Set propertyNamesSet;
 
 	static {
@@ -236,10 +241,10 @@
 			Exception ex = e.getStatus().getException();
 			if (ex != null) {
 				String exs = ex.toString();
-				if (exs.indexOf("Connection refused") > 0 || exs.indexOf("connection refused") > 0) {
+				if (exs.indexOf("Connection refused") >= 0 || exs.indexOf("connection refused") >= 0) {
 					factorMultiple(contacts, getFactor(FACTOR_CONNECTION_REFUSED));
 				}
-				else if (exs.indexOf("timeout") > 0) {
+				else if (exs.indexOf("timeout") >= 0) {
 					factorMultiple(contacts, getFactor(FACTOR_CONNECTION_TIMEOUT));
 				}
 			}
@@ -250,15 +255,27 @@
 	private void factorSubmission(Task t, Contact[] contacts, int exp) {
 		// I wonder where the line between abstraction and obfuscation is...
 		if (t.getType() == Task.JOB_SUBMISSION) {
-			factorMultiple(contacts, Math.pow(getFactor(FACTOR_SUBMISSION_TASK_LOAD), exp));
+			factorMultiple(contacts, spow(getFactor(FACTOR_SUBMISSION_TASK_LOAD), exp));
 		}
 		else if (t.getType() == Task.FILE_TRANSFER) {
-			factorMultiple(contacts, Math.pow(getFactor(FACTOR_TRANSFER_TASK_LOAD), exp));
+			factorMultiple(contacts, spow(getFactor(FACTOR_TRANSFER_TASK_LOAD), exp));
 		}
 		else if (t.getType() == Task.FILE_OPERATION) {
-			factorMultiple(contacts, Math.pow(getFactor(FACTOR_FILEOP_TASK_LOAD), exp));
+			factorMultiple(contacts, spow(getFactor(FACTOR_FILEOP_TASK_LOAD), exp));
 		}
 	}
+	
+	private double spow(double x, int exp) {
+		if (exp == 1) {
+			return x;
+		}
+		else if (exp == -1) {
+			return 1/x;
+		}
+		else {
+			throw new IllegalArgumentException();
+		}
+	}
 
 	private void factorMultiple(Contact[] contacts, double factor) {
 		for (int i = 0; i < contacts.length; i++) {
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java	2006-11-06 18:55:27 UTC (rev 1339)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java	2006-11-06 19:52:31 UTC (rev 1340)
@@ -9,108 +9,75 @@
  */
 package org.globus.cog.karajan.scheduler;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.Map.Entry;
+import java.util.Iterator;
+import java.util.TreeSet;
 
 public class WeightedHostSet {
-	private TreeMap map;
-	private HashMap scores;
+	private TreeSet scores;
+	private double sum;
 
 	public WeightedHostSet() {
-		map = new TreeMap();
-		scores = new HashMap();
+		init();
 	}
 	
+	protected void init() {
+		scores = new TreeSet();
+		sum = 0;
+	}
+
 	public synchronized void add(WeightedHost wh) {
-		Double score = wh.getScoreAsDouble();
-		Set s = (Set) map.get(score);
-		if (s == null) {
-			s = new HashSet();
-			map.put(score, s);
-		}
-		
-		s.add(wh);
-		scores.put(wh, score);
+		scores.add(wh);
+		sum += wh.getScore();
 	}
 	
+	public synchronized void changeScore(WeightedHost wh, double newScore) {
+		scores.remove(wh);
+		sum -= wh.getScore();
+		scores.add(new WeightedHost(wh.getHost(), newScore));
+		sum += newScore;
+	}
+
 	public synchronized double remove(WeightedHost wh) {
-		Double score = (Double) scores.get(wh);
-		Set s = (Set) map.get(score);
-		if (s != null) {
-			s.remove(wh);
-			if (s.isEmpty()) {
-				map.remove(score);
-			}
-			scores.remove(wh);
-		}
-		return score.doubleValue();
+		scores.remove(wh);
+		return wh.getScore();
 	}
-	
-	public java.util.Iterator iterator() {
-		return new Iterator();
+
+	public Iterator iterator() {
+		return scores.iterator(); 
 	}
-	
+
 	public WeightedHost last() {
-		Set s = (Set) map.get(map.lastKey());
-		return (WeightedHost) s.iterator().next();
+		return (WeightedHost) scores.last();
 	}
-	
+
 	public int size() {
 		return scores.size();
 	}
-	
-	public String toString() {
-		return map.toString();
+
+	public double getSum() {
+		return sum;
 	}
-	
-	private class Iterator implements java.util.Iterator{
-		private final java.util.Iterator mapi;
-		private java.util.Iterator seti;
-		private WeightedHost lasth;
-		private Set lasts;
-		
-		
-		public Iterator() {
-			mapi = map.entrySet().iterator();
-			if (mapi.hasNext()) {
-				Entry next = (Entry) mapi.next();
-				lasts = (Set) next.getValue();
-				seti = lasts.iterator();
-			}
-		}
 
-		public boolean hasNext() {
-			return seti.hasNext() || mapi.hasNext();
+	protected synchronized void normalize(double target) {
+		double prod = 1;
+		Iterator i = scores.iterator();
+		while (i.hasNext()) {
+			WeightedHost wh = (WeightedHost) i.next();
+			prod *= wh.getScore();
 		}
-
-		public Object next() {
-			if (seti.hasNext()) {
-				lasth =  (WeightedHost) seti.next();
-				return lasth;
-			}
-			else if (mapi.hasNext()) {
-				Entry next = (Entry) mapi.next();
-				lasts = (Set) next.getValue();
-				seti = lasts.iterator();
-				lasth = (WeightedHost) seti.next();
-				return lasth;
-			}
-			else {
-				throw new NoSuchElementException();
-			}
+		double geomAvg = Math.pow(prod, target / scores.size());
+		double renormalizationFactor = 1 / geomAvg;
+		i = scores.iterator();
+		scores = new TreeSet();
+		while (i.hasNext()) {
+			WeightedHost wh = (WeightedHost) i.next();
+			WeightedHost nwh = new WeightedHost(wh.getHost(), wh.getScore()
+					* renormalizationFactor);
+			add(nwh);
 		}
-
-		public void remove() {
-			seti.remove();
-			if (lasts.isEmpty()) {
-				mapi.remove();
-			}
-			scores.remove(lasth);
-		}
 	}
 
+	public String toString() {
+		return scores.toString();
+	}
 }
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2006-11-30 22:31:30
       
   | 
Revision: 1440
          http://svn.sourceforge.net/cogkit/?rev=1440&view=rev
Author:   hategan
Date:     2006-11-30 14:31:21 -0800 (Thu, 30 Nov 2006)
Log Message:
-----------
tweaks (fixes actually) to the wh scheduler
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2006-11-30 22:29:56 UTC (rev 1439)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2006-11-30 22:31:21 UTC (rev 1440)
@@ -9,33 +9,76 @@
  */
 package org.globus.cog.karajan.scheduler;
 
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+
 import org.globus.cog.karajan.util.BoundContact;
 
 public class WeightedHost implements Comparable {
 	private BoundContact host;
 	private Double score;
+	private double tscore;
+	private int load;
+	private double delayedDelta;
 
 	public WeightedHost(BoundContact contact) {
-		this(contact, 1.0);
+		this(contact, 0.0);
 	}
 
 	public WeightedHost(BoundContact contact, double score) {
+		this(contact, score, 0);
+	}
+	
+	public WeightedHost(BoundContact contact, double score, int load) {
 		this.host = contact;
+		setScore(score);
+		this.load = load;
+	}
+
+	protected void setScore(double score) {
 		this.score = new Double(score);
+		this.tscore = smooth(score);
 	}
 
+	public static final double T = 100;
+	public static final double B = 2.0*Math.log(T)/Math.PI;
+	public static final double C = 0.2;
+
+	public double smooth(double score) {
+		return Math.exp(B*Math.atan(C*score));
+	}
+
 	public final double getScore() {
 		return score.doubleValue();
 	}
-	
+
 	public final Double getScoreAsDouble() {
 		return score;
 	}
-	
+
+	public final double getTScore() {
+		return tscore;
+	}
+
 	public final BoundContact getHost() {
 		return host;
 	}
 
+	public int getLoad() {
+		return load;
+	}
+
+	public void setLoad(int load) {
+		this.load = load;
+	}
+
+	public synchronized void changeLoad(int dl) {
+		load += dl;
+		if (load < 0) {
+			load = 0;
+		}
+	}
+
 	public boolean equals(Object obj) {
 		if (obj instanceof WeightedHost) {
 			WeightedHost wh = (WeightedHost) obj;
@@ -48,19 +91,36 @@
 		return host.hashCode();
 	}
 
+	public static final NumberFormat D4;
+	static {
+		D4 = DecimalFormat.getInstance();
+		D4.setMaximumFractionDigits(3);
+		D4.setMinimumFractionDigits(3);
+	}
+
 	public String toString() {
-		return host.toString();
+		return host.toString() + ":" + D4.format(score) + "(" + D4.format(tscore) + "):"+load;
 	}
 
 	public int compareTo(Object o) {
 		WeightedHost other = (WeightedHost) o;
 		int r = score.compareTo(other.score);
 		if (r == 0) {
-			//arbitrary ordering on the contact
+			// arbitrary ordering on the contact
 			return System.identityHashCode(host) - System.identityHashCode(other.host);
 		}
 		else {
 			return r;
 		}
 	}
+
+	public double getDelayedDelta() {
+		return delayedDelta;
+	}
+
+	public void setDelayedDelta(double delayedDelta) {
+		this.delayedDelta = delayedDelta;
+	}
+	
+	
 }
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2006-11-30 22:29:56 UTC (rev 1439)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2006-11-30 22:31:21 UTC (rev 1440)
@@ -27,7 +27,6 @@
 import org.globus.cog.karajan.workflow.KarajanRuntimeException;
 
 public class WeightedHostScoreScheduler extends LateBindingScheduler {
-
 	private static final Logger logger = Logger.getLogger(WeightedHostScoreScheduler.class);
 
 	public static final int POLICY_WEIGHTED_RANDOM = 0;
@@ -40,43 +39,40 @@
 	public static final String FACTOR_SUCCESS = "successFactor";
 	public static final String FACTOR_FAILURE = "failureFactor";
 	public static final String SCORE_HIGH_CAP = "scoreHighCap";
-	public static final String SCORE_LOW_CAP = "scoreLowCap";
-	public static final String NORMALIZATION_DELAY = "normalizationDelay";
 	public static final String POLICY = "policy";
 
 	private WeightedHostSet sorted;
 	private int policy;
-	private int delay;
 
 	/*
 	 * These field names must match the property names
 	 */
 	private double connectionRefusedFactor, connectionTimeoutFactor, jobSubmissionTaskLoadFactor,
 			transferTaskLoadFactor, fileOperationTaskLoadFactor, successFactor, failureFactor,
-			scoreHighCap, scoreLowCap;
-	private int normalizationDelay;
+			scoreHighCap;
 
+	private int jobThrottle;
+
 	public WeightedHostScoreScheduler() {
 		policy = POLICY_WEIGHTED_RANDOM;
 		setDefaultFactors();
 	}
 
 	protected final void setDefaultFactors() {
-		connectionRefusedFactor = 0.1;
-		connectionTimeoutFactor = 0.05;
-		jobSubmissionTaskLoadFactor = 0.9;
-		transferTaskLoadFactor = 0.9;
-		fileOperationTaskLoadFactor = 0.95;
-		successFactor = 1.01;
-		failureFactor = 0.9;
+		connectionRefusedFactor = -10;
+		connectionTimeoutFactor = -20;
+		jobSubmissionTaskLoadFactor = -0.2;
+		transferTaskLoadFactor = -0.2;
+		fileOperationTaskLoadFactor = -0.01;
+		successFactor = 1;
+		failureFactor = -0.1;
 		scoreHighCap = 100;
-		scoreLowCap = 0.01;
-		normalizationDelay = 100;
+		jobThrottle = 2;
 	}
 
 	public void setResources(ContactSet grid) {
 		super.setResources(grid);
-		sorted = new WeightedHostSet();
+		sorted = new WeightedHostSet(scoreHighCap);
 		Iterator i = grid.getContacts().iterator();
 		while (i.hasNext()) {
 			addToSorted(new WeightedHost((BoundContact) i.next()));
@@ -92,78 +88,100 @@
 		if (logger.isDebugEnabled()) {
 			logger.debug("multiplyScore(" + wh + ", " + factor + ")");
 		}
-		double ns = checkCaps(score * factor);
+		double ns = factor(score, factor);
 		sorted.changeScore(wh, ns);
 		if (logger.isDebugEnabled()) {
 			logger.debug("Old score: " + score + ", new score: " + ns);
 		}
 	}
 
-	protected double checkCaps(double score) {
-		if (score > scoreHighCap) {
-			return scoreHighCap;
-		}
-		else if (score < scoreLowCap) {
-			return scoreLowCap;
-		}
-		else {
-			return score;
-		}
+	protected synchronized void multiplyScoreLater(WeightedHost wh, double factor) {
+		wh.setDelayedDelta(wh.getDelayedDelta() + factor);
 	}
 
+	protected double factor(double score, double factor) {
+		return score + factor;
+	}
+
 	protected synchronized BoundContact getNextContact(TaskConstraints t)
 			throws NoFreeResourceException {
 		checkGlobalLoadConditions();
 		BoundContact contact;
 
 		WeightedHostSet s = sorted;
+		WeightedHost selected = null;
 
 		s = constrain(s, getConstraintChecker(), t);
 
 		double sum = s.getSum();
 		if (policy == POLICY_WEIGHTED_RANDOM) {
 			double rand = Math.random() * sum;
+			if (logger.isInfoEnabled() && !s.isEmpty()) {
+				logger.info("Sorted: " + s);
+			}
 			if (logger.isDebugEnabled()) {
-				logger.debug("Sorted: " + s);
 				logger.debug("Rand: " + rand + ", sum: " + sum);
 			}
 			Iterator i = s.iterator();
-			
+
 			sum = 0;
 			while (i.hasNext()) {
 				WeightedHost wh = (WeightedHost) i.next();
-				sum += wh.getScore();
+				sum += wh.getTScore();
 				if (sum >= rand) {
-					return wh.getHost();
+					selected = wh;
+					break;
 				}
 			}
-			normalize();
-			contact = s.last().getHost();
+			if (selected == null) {
+				if (s.isEmpty()) {
+					throw new NoFreeResourceException();
+				}
+				else {
+					selected = s.last();
+				}
+			}
 		}
 		else if (policy == POLICY_BEST_SCORE) {
-			normalize();
-			contact = s.last().getHost();
+			selected = s.last();
 		}
 		else {
 			throw new KarajanRuntimeException("Invalid policy number: " + policy);
 		}
 		if (logger.isDebugEnabled()) {
-			logger.debug("Next contact: " + contact);
+			logger.debug("Next contact: " + selected.getHost());
 		}
-		return contact;
+		selected.changeLoad(1);
+		selected.setDelayedDelta(successFactor);
+		return selected.getHost();
 	}
 
+	public void releaseContact(BoundContact contact) {
+		if (logger.isDebugEnabled()) {
+			logger.debug("Releasing contact " + contact);
+		}
+		super.releaseContact(contact);
+		WeightedHost wh = sorted.findHost(contact);
+		if (wh != null) {
+			wh.changeLoad(-1);
+			sorted.changeScore(wh, wh.getScore() + wh.getDelayedDelta());
+		}
+		else {
+			logger.warn("ghost contact (" + contact + ") in releaseContact");
+		}
+	}
+
 	protected WeightedHostSet constrain(WeightedHostSet s, ResourceConstraintChecker rcc,
 			TaskConstraints tc) {
 		if (rcc == null) {
 			return s;
 		}
 		else {
-			WeightedHostSet ns = new WeightedHostSet();
+			WeightedHostSet ns = new WeightedHostSet(scoreHighCap);
 			Iterator i = s.iterator();
 			while (i.hasNext()) {
 				WeightedHost wh = (WeightedHost) i.next();
-				if (rcc.checkConstraints(wh.getHost(), tc)) {
+				if (rcc.checkConstraints(wh.getHost(), tc) && notOverloaded(wh)) {
 					ns.add(wh);
 				}
 			}
@@ -171,26 +189,17 @@
 		}
 	}
 
-	protected void normalize() {
-		delay++;
-		if (delay > normalizationDelay) {
-			if (logger.isDebugEnabled()) {
-				logger.debug("Normalizing...");
-				logger.debug("Before normalization: " + sorted);
-			}
-			delay = 0;
-			sorted.normalize(1);
-			if (logger.isDebugEnabled()) {
-				logger.debug("After normalization: " + sorted);
-			}
-		}
+	protected boolean notOverloaded(WeightedHost wh) {
+		double score = wh.getTScore();
+		int load = wh.getLoad();
+		return load < jobThrottle * score + 2;
 	}
 
 	private static String[] propertyNames;
 	private static final String[] myPropertyNames = new String[] { POLICY,
 			FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
 			FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
-			SCORE_HIGH_CAP, SCORE_LOW_CAP, NORMALIZATION_DELAY };
+			SCORE_HIGH_CAP };
 	private static Set propertyNamesSet;
 
 	static {
@@ -262,19 +271,19 @@
 			}
 			else if (code == Status.COMPLETED) {
 				factorSubmission(t, contacts, -1);
-				factorMultiple(contacts, successFactor);
+				factorMultipleLater(contacts, successFactor);
 			}
 			else if (code == Status.FAILED) {
-				factorMultiple(contacts, failureFactor);
+				factorMultipleLater(contacts, failureFactor);
 				Exception ex = e.getStatus().getException();
 				if (ex != null) {
 					String exs = ex.toString();
 					if (exs.indexOf("Connection refused") >= 0
 							|| exs.indexOf("connection refused") >= 0) {
-						factorMultiple(contacts, connectionRefusedFactor);
+						factorMultipleLater(contacts, connectionRefusedFactor);
 					}
 					else if (exs.indexOf("timeout") >= 0) {
-						factorMultiple(contacts, connectionTimeoutFactor);
+						factorMultipleLater(contacts, connectionTimeoutFactor);
 					}
 				}
 			}
@@ -305,7 +314,7 @@
 			return x;
 		}
 		else if (exp == -1) {
-			return 1 / x;
+			return -x;
 		}
 		else {
 			throw new IllegalArgumentException();
@@ -321,4 +330,14 @@
 			}
 		}
 	}
+
+	private void factorMultipleLater(Contact[] contacts, double factor) {
+		for (int i = 0; i < contacts.length; i++) {
+			BoundContact bc = (BoundContact) contacts[i];
+			WeightedHost wh = sorted.findHost(bc);
+			if (wh != null) {
+				multiplyScoreLater(wh, factor);
+			}
+		}
+	}
 }
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java	2006-11-30 22:29:56 UTC (rev 1439)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java	2006-11-30 22:31:21 UTC (rev 1440)
@@ -20,11 +20,13 @@
 	private TreeSet scores;
 	private Map weightedHosts;
 	private double sum;
+	private double scoreHighCap;
 
-	public WeightedHostSet() {
+	public WeightedHostSet(double scoreHighCap) {
 		init();
+		this.scoreHighCap = scoreHighCap;
 	}
-	
+
 	protected void init() {
 		scores = new TreeSet();
 		weightedHosts = new HashMap();
@@ -34,16 +36,16 @@
 	public synchronized void add(WeightedHost wh) {
 		scores.add(wh);
 		weightedHosts.put(wh.getHost(), wh);
-		sum += wh.getScore();
+		sum += wh.getTScore();
 	}
-	
+
 	public synchronized void changeScore(WeightedHost wh, double newScore) {
 		scores.remove(wh);
-		sum -= wh.getScore();
-		WeightedHost nwh = new WeightedHost(wh.getHost(), newScore);
+		sum -= wh.getTScore();
+		WeightedHost nwh = new WeightedHost(wh.getHost(), newScore, wh.getLoad());
 		weightedHosts.put(wh.getHost(), nwh);
 		scores.add(nwh);
-		sum += newScore;
+		sum += nwh.getTScore();
 	}
 
 	public synchronized double remove(WeightedHost wh) {
@@ -57,7 +59,7 @@
 	}
 
 	public Iterator iterator() {
-		return scores.iterator(); 
+		return scores.iterator();
 	}
 
 	public WeightedHost last() {
@@ -85,12 +87,15 @@
 		scores = new TreeSet();
 		while (i.hasNext()) {
 			WeightedHost wh = (WeightedHost) i.next();
-			WeightedHost nwh = new WeightedHost(wh.getHost(), wh.getScore()
-					* renormalizationFactor);
+			WeightedHost nwh = new WeightedHost(wh.getHost(), wh.getScore() * renormalizationFactor);
 			add(nwh);
 		}
 	}
 
+	public boolean isEmpty() {
+		return scores.isEmpty();
+	}
+
 	public String toString() {
 		return scores.toString();
 	}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2006-12-07 17:26:03
       
   | 
Revision: 1449
          http://svn.sourceforge.net/cogkit/?rev=1449&view=rev
Author:   hategan
Date:     2006-12-07 09:26:00 -0800 (Thu, 07 Dec 2006)
Log Message:
-----------
fail when no resources can be found for a task and no tasks are running
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java	2006-12-07 17:24:30 UTC (rev 1448)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java	2006-12-07 17:26:00 UTC (rev 1449)
@@ -16,6 +16,10 @@
 public class ContactAllocationTask extends TaskImpl {
 	private BoundContact contact;
 	private VariableStack stack;
+	
+	public ContactAllocationTask() {
+		setName("Contact allocation task");
+	}
 
 	public BoundContact getContact() {
 		return contact;
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2006-12-07 17:24:30 UTC (rev 1448)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2006-12-07 17:26:00 UTC (rev 1449)
@@ -152,7 +152,14 @@
 	}
 
 	protected TaskConstraints getTaskConstraints(Task t) {
-		return null;
+		Object constraints = super.getConstraints(t);
+        if (constraints instanceof Contact[]) {
+        	Contact[] c = (Contact[]) constraints;
+            if (c.length > 0 && c[0] != null) {
+            	return c[0].getConstraints();
+            }
+        }
+        return null;
 	}
 
 	public void enqueue(Task task, Object constraints) {
@@ -236,6 +243,10 @@
 					success = true;
 				}
 				catch (NoFreeResourceException e) {
+					if (running == 0) {
+						failUnsubmittedTask(t, "Could not find any valid host for task \"" + t
+								+ "\" with constraints " + getTaskConstraints(t), e);
+					}
 				}
 				catch (Exception e) {
 					failTask(t, "The scheduler could not execute the task", e);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2006-12-28 20:37:03
       
   | 
Revision: 1495
          http://svn.sourceforge.net/cogkit/?rev=1495&view=rev
Author:   hategan
Date:     2006-12-28 12:37:00 -0800 (Thu, 28 Dec 2006)
Log Message:
-----------
clean constraints; distinguish between no resources because of unsatisfiable contraints or load conditions
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Added Paths:
-----------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/NoSuchResourceException.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2006-12-28 20:35:59 UTC (rev 1494)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2006-12-28 20:37:00 UTC (rev 1495)
@@ -211,6 +211,12 @@
 			return constraints.get(task);
 		}
 	}
+	
+	protected void removeConstraints(Task task) {
+		synchronized(constraints) {
+			constraints.remove(task);
+		}
+	}
 
 	public void addTaskTransformer(TaskTransformer taskTransformer) {
 		this.taskTransformers.add(taskTransformer);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java	2006-12-28 20:35:59 UTC (rev 1494)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/DefaultScheduler.java	2006-12-28 20:37:00 UTC (rev 1495)
@@ -65,21 +65,29 @@
 		checkGlobalLoadConditions();
 		int initial = contactCursor;
 		ContactSet resources = getResources();
-		while (!checkLoad(resources.get(contactCursor))
-				|| !checkConstraints(resources.get(contactCursor), t)) {
+		boolean incompatibleConstraints = true;
+		while (true) {
+			if (checkConstraints(resources.get(contactCursor), t)) {
+				incompatibleConstraints = false;
+				if (checkLoad(resources.get(contactCursor))) {
+					break;
+				}
+			}
 			incContactCursor();
 			if (contactCursor == initial) {
-				logger.debug("No free resources");
-				throw new NoFreeResourceException("No free hosts available");
+				if (incompatibleConstraints) {
+					throw new NoSuchResourceException();
+				}
+				else {
+					throw new NoFreeResourceException("No free hosts available");
+				}
 			}
 		}
-		if (initial == contactCursor) {
-			incContactCursor();
-		}
 		BoundContact contact = getResources().get(contactCursor);
 		if (logger.isDebugEnabled()) {
 			logger.debug("Contact: " + contact);
 		}
+		incContactCursor();
 		return contact;
 	}
 
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2006-12-28 20:35:59 UTC (rev 1494)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2006-12-28 20:37:00 UTC (rev 1495)
@@ -235,22 +235,26 @@
 					break;
 				}
 				Task t = (Task) getJobQueue().get(index);
+				boolean remove = true;
 				try {
 					submitUnbound(t);
-					synchronized (this) {
-						getJobQueue().remove(index);
-					}
 					success = true;
 				}
-				catch (NoFreeResourceException e) {
-					if (running == 0) {
-						failUnsubmittedTask(t, "Could not find any valid host for task \"" + t
+				catch (NoSuchResourceException e) {
+					failTask(t, "Could not find any valid host for task \"" + t
 								+ "\" with constraints " + getTaskConstraints(t), e);
-					}
 				}
+				catch (NoFreeResourceException e) {
+					remove = false;
+				}
 				catch (Exception e) {
 					failTask(t, "The scheduler could not execute the task", e);
 				}
+				if (remove) {
+					synchronized (this) {
+						getJobQueue().remove(index);
+					}
+				}
 				index++;
 			}
 		}
@@ -277,14 +281,6 @@
 		s.setMessage(message);
 		s.setException(e);
 		t.setStatus(s);
-	}
-
-	protected void failUnsubmittedTask(Task t, String message, Exception e) {
-		Status s = new StatusImpl();
-		s.setStatusCode(Status.FAILED);
-		s.setMessage(message);
-		s.setException(e);
-		t.setStatus(s);
 		fireJobStatusChangeEvent(t, s);
 	}
 
@@ -327,7 +323,7 @@
 					services[i] = resolveService((BoundContact) contacts[i], t.getType());
 				}
 				if (services[i] == null) {
-					failUnsubmittedTask(t, "Could not find a suitable service/provider for host "
+					failTask(t, "Could not find a suitable service/provider for host "
 							+ contacts[i], null);
 					return;
 				}
@@ -348,11 +344,10 @@
 			throw e;
 		}
 		catch (Exception e) {
-			if (e instanceof NullPointerException) {
-				e.printStackTrace();
-			}
-			logger.debug("Scheduler exception: job =" + t.getIdentity().getValue() + ", status = "
+			if (logger.isDebugEnabled()) {
+				logger.debug("Scheduler exception: job =" + t.getIdentity().getValue() + ", status = "
 					+ t.getStatus(), e);
+			}
 			Status status = t.getStatus();
 			status.setPrevStatusCode(status.getStatusCode());
 			status.setStatusCode(Status.FAILED);
@@ -556,6 +551,7 @@
 					synchronized (taskContacts) {
 						taskContacts.remove(task);
 					}
+					removeConstraints(task);
 
 					for (int i = 0; i < contacts.length; i++) {
 						BoundContact c = (BoundContact) contacts[i];
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/NoSuchResourceException.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/NoSuchResourceException.java	                        (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/NoSuchResourceException.java	2006-12-28 20:37:00 UTC (rev 1495)
@@ -0,0 +1,30 @@
+// ----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Sep 2, 2004
+ */
+package org.globus.cog.karajan.scheduler;
+
+public class NoSuchResourceException extends NoFreeResourceException {
+	private static final long serialVersionUID = 4757539377378613461L;
+
+	public NoSuchResourceException() {
+		super();
+	}
+
+	public NoSuchResourceException(String message) {
+		super(message);
+	}
+
+	public NoSuchResourceException(Throwable cause) {
+		super(cause);
+	}
+
+	public NoSuchResourceException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2006-12-28 20:35:59 UTC (rev 1494)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2006-12-28 20:37:00 UTC (rev 1495)
@@ -112,6 +112,16 @@
 		WeightedHost selected = null;
 
 		s = constrain(s, getConstraintChecker(), t);
+		
+		if (s.isEmpty()) {
+			throw new NoSuchResourceException();
+		}
+		
+		removeOverloaded(s);
+		
+		if (s.isEmpty()) {
+			throw new NoFreeResourceException();
+		}
 
 		double sum = s.getSum();
 		if (policy == POLICY_WEIGHTED_RANDOM) {
@@ -134,12 +144,7 @@
 				}
 			}
 			if (selected == null) {
-				if (s.isEmpty()) {
-					throw new NoFreeResourceException();
-				}
-				else {
-					selected = s.last();
-				}
+				selected = s.last();
 			}
 		}
 		else if (policy == POLICY_BEST_SCORE) {
@@ -181,18 +186,28 @@
 			Iterator i = s.iterator();
 			while (i.hasNext()) {
 				WeightedHost wh = (WeightedHost) i.next();
-				if (rcc.checkConstraints(wh.getHost(), tc) && notOverloaded(wh)) {
+				if (rcc.checkConstraints(wh.getHost(), tc)) {
 					ns.add(wh);
 				}
 			}
 			return ns;
 		}
 	}
+	
+	protected void removeOverloaded(WeightedHostSet s) {
+		 Iterator i = s.iterator();
+		while (i.hasNext()) {
+			WeightedHost wh = (WeightedHost) i.next();
+			if (overloaded(wh)) {
+				i.remove();
+			}
+		}
+	}
 
-	protected boolean notOverloaded(WeightedHost wh) {
+	protected boolean overloaded(WeightedHost wh) {
 		double score = wh.getTScore();
 		int load = wh.getLoad();
-		return load < jobThrottle * score + 2;
+		return !(load < jobThrottle * score + 2);
 	}
 
 	private static String[] propertyNames;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2007-01-04 21:40:17
       
   | 
Revision: 1521
          http://svn.sourceforge.net/cogkit/?rev=1521&view=rev
Author:   hategan
Date:     2007-01-04 13:40:12 -0800 (Thu, 04 Jan 2007)
Log Message:
-----------
fixed a few issues
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2007-01-04 04:55:07 UTC (rev 1520)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2007-01-04 21:40:12 UTC (rev 1521)
@@ -6,6 +6,7 @@
 
 package org.globus.cog.karajan.scheduler;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -49,7 +50,7 @@
 	private final Map constraints;
 
 	private List taskTransformers, failureHandlers;
-	
+
 	private ResourceConstraintChecker constraintChecker;
 
 	public AbstractScheduler() {
@@ -125,29 +126,38 @@
 
 	public void addJobStatusListener(StatusListener l, Task task) {
 		List jobListeners;
-		if (listeners.containsKey(task)) {
-			jobListeners = (List) listeners.get(task);
+		synchronized (listeners) {
+			if (listeners.containsKey(task)) {
+				jobListeners = (List) listeners.get(task);
+			}
+			else {
+				jobListeners = new ArrayList();
+				listeners.put(task, jobListeners);
+			}
+			jobListeners.add(l);
 		}
-		else {
-			jobListeners = new LinkedList();
-			listeners.put(task, jobListeners);
-		}
-		jobListeners.add(l);
 	}
 
 	public void removeJobStatusListener(StatusListener l, Task task) {
-		if (listeners.containsKey(task)) {
-			List jobListeners = (List) listeners.get(task);
-			jobListeners.remove(l);
-			if (jobListeners.size() == 0) {
-				listeners.remove(task);
+		synchronized (listeners) {
+			if (listeners.containsKey(task)) {
+				List jobListeners = (List) listeners.get(task);
+				jobListeners.remove(l);
+				if (jobListeners.size() == 0) {
+					listeners.remove(task);
+				}
 			}
 		}
 	}
 
 	public void fireJobStatusChangeEvent(StatusEvent e) {
-		if (listeners.containsKey(e.getSource())) {
-			List jobListeners = new LinkedList((List) listeners.get(e.getSource()));
+		List jobListeners = null;
+		synchronized(listeners) {
+			if (listeners.containsKey(e.getSource())) {
+				jobListeners = new ArrayList((List) listeners.get(e.getSource()));
+			}
+		}
+		if (jobListeners != null) {
 			Iterator i = jobListeners.iterator();
 			while (i.hasNext()) {
 				((StatusListener) i.next()).statusChanged(e);
@@ -201,19 +211,19 @@
 	}
 
 	protected void setConstraints(Task task, Object constraint) {
-		synchronized(constraints) {
+		synchronized (constraints) {
 			constraints.put(task, constraint);
 		}
 	}
 
 	protected Object getConstraints(Task task) {
-		synchronized(constraints) {
+		synchronized (constraints) {
 			return constraints.get(task);
 		}
 	}
-	
+
 	protected void removeConstraints(Task task) {
-		synchronized(constraints) {
+		synchronized (constraints) {
 			constraints.remove(task);
 		}
 	}
@@ -233,7 +243,7 @@
 			((TaskTransformer) i.next()).transformTask(t, contacts, services);
 		}
 	}
-	
+
 	protected boolean runFailureHandlers(Task t) {
 		Iterator i = failureHandlers.iterator();
 		while (i.hasNext()) {
@@ -244,7 +254,7 @@
 		}
 		return false;
 	}
-	
+
 	public void addFailureHandler(FailureHandler handler) {
 		failureHandlers.add(handler);
 	}
@@ -256,7 +266,7 @@
 	public void setConstraintChecker(ResourceConstraintChecker constraintChecker) {
 		this.constraintChecker = constraintChecker;
 	}
-	
+
 	protected boolean checkConstraints(BoundContact resource, TaskConstraints tc) {
 		if (constraintChecker == null) {
 			return true;
@@ -265,7 +275,7 @@
 			return constraintChecker.checkConstraints(resource, tc);
 		}
 	}
-	
+
 	protected List checkConstraints(List resources, TaskConstraints tc) {
 		if (constraintChecker == null) {
 			return resources;
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-01-04 04:55:07 UTC (rev 1520)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-01-04 21:40:12 UTC (rev 1521)
@@ -40,6 +40,7 @@
 	public static final String FACTOR_FAILURE = "failureFactor";
 	public static final String SCORE_HIGH_CAP = "scoreHighCap";
 	public static final String POLICY = "policy";
+	public static final String JOB_THROTTLE = "jobThrottle";
 
 	private WeightedHostSet sorted;
 	private int policy;
@@ -112,13 +113,13 @@
 		WeightedHost selected = null;
 
 		s = constrain(s, getConstraintChecker(), t);
-		
+
 		if (s.isEmpty()) {
 			throw new NoSuchResourceException();
 		}
-		
-		removeOverloaded(s);
-		
+
+		s = removeOverloaded(s);
+
 		if (s.isEmpty()) {
 			throw new NoFreeResourceException();
 		}
@@ -161,7 +162,7 @@
 		return selected.getHost();
 	}
 
-	public void releaseContact(BoundContact contact) {
+	public synchronized void releaseContact(BoundContact contact) {
 		if (logger.isDebugEnabled()) {
 			logger.debug("Releasing contact " + contact);
 		}
@@ -193,15 +194,29 @@
 			return ns;
 		}
 	}
-	
-	protected void removeOverloaded(WeightedHostSet s) {
-		 Iterator i = s.iterator();
-		while (i.hasNext()) {
-			WeightedHost wh = (WeightedHost) i.next();
-			if (overloaded(wh)) {
-				i.remove();
+
+	protected WeightedHostSet removeOverloaded(WeightedHostSet s) {
+		if (s == sorted) {
+			WeightedHostSet ns = new WeightedHostSet(scoreHighCap);
+			Iterator i = s.iterator();
+			while (i.hasNext()) {
+				WeightedHost wh = (WeightedHost) i.next();
+				if (!overloaded(wh)) {
+					ns.add(wh);
+				}
 			}
+			return ns;
 		}
+		else {
+			Iterator i = s.iterator();
+			while (i.hasNext()) {
+				WeightedHost wh = (WeightedHost) i.next();
+				if (overloaded(wh)) {
+					i.remove();
+				}
+			}
+			return s;
+		}
 	}
 
 	protected boolean overloaded(WeightedHost wh) {
@@ -214,7 +229,7 @@
 	private static final String[] myPropertyNames = new String[] { POLICY,
 			FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
 			FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
-			SCORE_HIGH_CAP };
+			SCORE_HIGH_CAP, JOB_THROTTLE };
 	private static Set propertyNamesSet;
 
 	static {
@@ -248,6 +263,9 @@
 					throw new KarajanRuntimeException("Unknown policy type: " + value);
 				}
 			}
+			else if (JOB_THROTTLE.equals(name)) {
+				jobThrottle = TypeUtil.toInt(value);
+			}
 			else {
 				double val = TypeUtil.toDouble(value);
 				try {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2007-01-05 03:53:12
       
   | 
Revision: 1525
          http://svn.sourceforge.net/cogkit/?rev=1525&view=rev
Author:   hategan
Date:     2007-01-04 19:53:10 -0800 (Thu, 04 Jan 2007)
Log Message:
-----------
some adjustments
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-01-05 03:52:39 UTC (rev 1524)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-01-05 03:53:10 UTC (rev 1525)
@@ -277,6 +277,7 @@
 
 	protected void failTask(Task t, String message, Exception e) {
 		Status s = new StatusImpl();
+		s.setPrevStatusCode(t.getStatus().getStatusCode());
 		s.setStatusCode(Status.FAILED);
 		s.setMessage(message);
 		s.setException(e);
@@ -348,13 +349,7 @@
 				logger.debug("Scheduler exception: job =" + t.getIdentity().getValue() + ", status = "
 					+ t.getStatus(), e);
 			}
-			Status status = t.getStatus();
-			status.setPrevStatusCode(status.getStatusCode());
-			status.setStatusCode(Status.FAILED);
-			status.setException(e);
-			status.setMessage(e.toString());
-			StatusEvent se = new StatusEvent(t, status);
-			fireJobStatusChangeEvent(se);
+			failTask(t, e.toString(), e);
 			return;
 		}
 	}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-01-05 03:52:39 UTC (rev 1524)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-01-05 03:53:10 UTC (rev 1525)
@@ -298,6 +298,11 @@
 			Task t = (Task) e.getSource();
 			int code = e.getStatus().getStatusCode();
 			Contact[] contacts = getContacts(t);
+			
+			if (contacts == null) {
+				return;
+			}
+			
 			if (code == Status.SUBMITTED) {
 				// this isn't reliable
 				// factorSubmission(t, contacts, 1);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2007-02-28 01:49:50
       
   | 
Revision: 1596
          http://svn.sourceforge.net/cogkit/?rev=1596&view=rev
Author:   hategan
Date:     2007-02-27 17:49:48 -0800 (Tue, 27 Feb 2007)
Log Message:
-----------
fixed a bunch of leaks
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2007-02-28 01:49:48 UTC (rev 1596)
@@ -152,7 +152,7 @@
 
 	public void fireJobStatusChangeEvent(StatusEvent e) {
 		List jobListeners = null;
-		synchronized(listeners) {
+		synchronized (listeners) {
 			if (listeners.containsKey(e.getSource())) {
 				jobListeners = new ArrayList((List) listeners.get(e.getSource()));
 			}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java	2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/ContactAllocationTask.java	2007-02-28 01:49:48 UTC (rev 1596)
@@ -12,9 +12,11 @@
 import org.globus.cog.abstraction.impl.common.task.TaskImpl;
 import org.globus.cog.karajan.stack.VariableStack;
 import org.globus.cog.karajan.util.BoundContact;
+import org.globus.cog.karajan.util.Contact;
 
 public class ContactAllocationTask extends TaskImpl {
 	private BoundContact contact;
+	private Contact virtualContact;
 	private VariableStack stack;
 	
 	public ContactAllocationTask() {
@@ -40,4 +42,12 @@
 	public int getRequiredServices() {
 		return 1;
 	}
+
+	public void setVirtualContact(Contact vc) {
+		this.virtualContact = vc;
+	}
+	
+	public Contact getVirtualContact() {
+		return this.virtualContact;
+	}
 }
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-02-28 01:49:48 UTC (rev 1596)
@@ -106,12 +106,12 @@
 		return allocateContact(null);
 	}
 
-	public synchronized void releaseContact(BoundContact contact) {
-		virtualContacts.remove(contact.getHost());
+	public synchronized void releaseContact(Contact contact) {
+		virtualContacts.remove(contact);
 		tasksFinished = true;
 	}
 
-	public synchronized BoundContact resolveVirtualContact(Task t, Contact contact)
+	public synchronized BoundContact resolveVirtualContact(Contact contact)
 			throws NoFreeResourceException {
 		if (contact.isVirtual()) {
 			BoundContact next;
@@ -382,7 +382,7 @@
 		}
 		else {
 			if (contact.isVirtual()) {
-				boundContact = resolveVirtualContact(t, contact);
+				boundContact = resolveVirtualContact(contact);
 			}
 			else {
 				boundContact = (BoundContact) contact;
@@ -447,6 +447,7 @@
 			throws TaskSubmissionException {
 		if (t instanceof ContactAllocationTask) {
 			((ContactAllocationTask) t).setContact((BoundContact) contacts[0]);
+			removeConstraints(t);
 			Status status = t.getStatus();
 			status.setPrevStatusCode(status.getStatusCode());
 			status.setStatusCode(Status.COMPLETED);
@@ -454,6 +455,14 @@
 			fireJobStatusChangeEvent(se);
 			return;
 		}
+		for (int i = 0; i < contacts.length; i++) {
+			if (!(contacts[i] instanceof BoundContact)) {
+				throw new TaskSubmissionException(
+						"submitBoundToServices called but at least a contact is not bound");
+			}
+			BoundContact c = (BoundContact) contacts[i];
+			c.setActiveTasks(c.getActiveTasks() + 1);
+		}
 		applyTaskTransformers(t, contacts, services);
 		t.addStatusListener(this);
 		TaskHandler handler;
@@ -467,14 +476,6 @@
 		synchronized (taskContacts) {
 			taskContacts.put(t, contacts);
 		}
-		for (int i = 0; i < contacts.length; i++) {
-			if (!(contacts[i] instanceof BoundContact)) {
-				throw new TaskSubmissionException(
-						"submitBoundToServices called but at least a contact is not bound");
-			}
-			BoundContact c = (BoundContact) contacts[i];
-			c.setActiveTasks(c.getActiveTasks() + 1);
-		}
 		if (logger.isDebugEnabled()) {
 			logger.debug("Submitting task " + t);
 		}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java	2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java	2007-02-28 01:49:48 UTC (rev 1596)
@@ -10,7 +10,6 @@
 
 import org.globus.cog.abstraction.interfaces.StatusListener;
 import org.globus.cog.abstraction.interfaces.Task;
-import org.globus.cog.karajan.util.BoundContact;
 import org.globus.cog.karajan.util.Contact;
 import org.globus.cog.karajan.util.ContactSet;
 import org.globus.cog.karajan.util.TaskHandlerWrapper;
@@ -41,7 +40,7 @@
 	 * Can be used to tell the scheduler that a previously allocated contact
 	 * (using allocateContact()) is not used any more.
 	 */
-	void releaseContact(BoundContact sc);
+	void releaseContact(Contact sc);
 
 	/**
 	 * Sets the set of resources that the scheduler will use
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-02-28 01:48:31 UTC (rev 1595)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-02-28 01:49:48 UTC (rev 1596)
@@ -78,9 +78,9 @@
 
 	public void setResources(ContactSet grid) {
 		super.setResources(grid);
-        if (grid.getContacts() == null) {
-            return;
-        }
+		if (grid.getContacts() == null) {
+			return;
+		}
 		sorted = new WeightedHostSet(scoreHighCap);
 		Iterator i = grid.getContacts().iterator();
 		while (i.hasNext()) {
@@ -184,26 +184,33 @@
 			throw new KarajanRuntimeException("Invalid policy number: " + policy);
 		}
 		if (logger.isDebugEnabled()) {
-			logger.debug("Next contact: " + selected.getHost());
+			logger.debug("Next contact: " + selected);
 		}
 		sorted.changeLoad(selected, 1);
 		selected.setDelayedDelta(successFactor);
 		return selected.getHost();
 	}
 
-	public synchronized void releaseContact(BoundContact contact) {
+	public synchronized void releaseContact(Contact contact) {
 		if (logger.isDebugEnabled()) {
 			logger.debug("Releasing contact " + contact);
 		}
-		super.releaseContact(contact);
-		WeightedHost wh = sorted.findHost(contact);
-		if (wh != null) {
-			change = true;
-			sorted.changeLoad(wh, -1);
-			sorted.changeScore(wh, wh.getScore() + wh.getDelayedDelta());
+		try {
+			BoundContact bc = this.resolveVirtualContact(contact);
+			super.releaseContact(contact);
+
+			WeightedHost wh = sorted.findHost(bc);
+			if (wh != null) {
+				change = true;
+				sorted.changeLoad(wh, -1);
+				sorted.changeScore(wh, wh.getScore() + wh.getDelayedDelta());
+			}
+			else {
+				logger.warn("ghost contact (" + contact + ") in releaseContact");
+			}
 		}
-		else {
-			logger.warn("ghost contact (" + contact + ") in releaseContact");
+		catch (NoFreeResourceException e) {
+			logger.warn("Failed to release contact " + contact, e);
 		}
 	}
 
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2007-08-24 00:51:43
       
   | 
Revision: 1723
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1723&view=rev
Author:   hategan
Date:     2007-08-23 17:51:42 -0700 (Thu, 23 Aug 2007)
Log Message:
-----------
some failures didn't properly clean up
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-08-24 00:50:52 UTC (rev 1722)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-08-24 00:51:42 UTC (rev 1723)
@@ -9,8 +9,10 @@
  */
 package org.globus.cog.karajan.scheduler;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
@@ -297,6 +299,9 @@
 	}
 
 	protected void failTask(Task t, String message, Exception e) {
+		if (logger.isDebugEnabled()) {
+			logger.debug("Failing task " + t + " because of " + message, e);
+		}
 		Status s = new StatusImpl();
 		s.setPrevStatusCode(t.getStatus().getStatusCode());
 		s.setStatusCode(Status.FAILED);
@@ -305,6 +310,8 @@
 		t.setStatus(s);
 		fireJobStatusChangeEvent(t, s);
 	}
+	
+	private List contactTran = new ArrayList();
 
 	void submitUnbound(Task t) throws NoFreeResourceException {
 		try {
@@ -312,6 +319,7 @@
 				return;
 			}
 			checkTaskLoadConditions(t);
+			contactTran.clear();
 			Service[] services = new Service[t.getRequiredServices()];
 			Contact[] contacts;
 			Object constraints = getConstraints(t);
@@ -319,6 +327,7 @@
 				contacts = (Contact[]) constraints;
 				if (contacts == null) {
 					contacts = new Contact[] { this.getNextContact(t) };
+					contactTran.add(contacts[0]);
 				}
 			}
 			else {
@@ -326,13 +335,15 @@
 				for (int i = 0; i < t.getRequiredServices(); i++) {
 					if (t.getService(i) == null) {
 						contacts[i] = this.getNextContact(t);
+						contactTran.add(contacts[i]);
 					}
 				}
 			}
-
+ 
 			for (int i = 0; i < services.length; i++) {
 				if (contacts[i] != null && contacts[i].isVirtual()) {
 					contacts[i] = resolveContact(t, contacts[i]);
+					contactTran.add(contacts[i]);
 				}
 				try {
 					services[i] = t.getService(i);
@@ -357,11 +368,15 @@
 					((JobSpecification) t.getSpecification()).setAttribute("project", project);
 				}
 			}
-
+			
 			submitBoundToServices(t, contacts, services);
 			logger.debug("No host specified");
 		}
 		catch (NoFreeResourceException e) {
+			Iterator i = contactTran.iterator();
+			while (i.hasNext()) {
+				releaseContact((Contact) i.next());
+			}
 			throw e;
 		}
 		catch (Exception e) {
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-08-24 00:50:52 UTC (rev 1722)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-08-24 00:51:42 UTC (rev 1723)
@@ -109,7 +109,7 @@
 		wh.setDelayedDelta(wh.getDelayedDelta() + factor);
 	}
 
-	protected double factor(double score, double factor) {
+	protected final double factor(double score, double factor) {
 		return score + factor;
 	}
 
@@ -186,10 +186,13 @@
 		if (logger.isDebugEnabled()) {
 			logger.debug("Next contact: " + selected);
 		}
+		
 		sorted.changeLoad(selected, 1);
 		selected.setDelayedDelta(successFactor);
 		return selected.getHost();
 	}
+	
+	
 
 	public synchronized void releaseContact(Contact contact) {
 		if (logger.isDebugEnabled()) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2007-08-28 16:38:47
       
   | 
Revision: 1728
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1728&view=rev
Author:   hategan
Date:     2007-08-28 09:38:34 -0700 (Tue, 28 Aug 2007)
Log Message:
-----------
off as a valid throttle value
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2007-08-24 02:55:34 UTC (rev 1727)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2007-08-28 16:38:34 UTC (rev 1728)
@@ -32,6 +32,8 @@
 public abstract class AbstractScheduler extends Thread implements Scheduler {
 
 	private static final Logger logger = Logger.getLogger(AbstractScheduler.class);
+	
+	public static final int THROTTLE_OFF = 100000000;
 
 	private List taskHandlers;
 
@@ -185,13 +187,22 @@
 	public void setProperty(String name, Object value) {
 		if (name.equalsIgnoreCase("maxSimultaneousJobs")) {
 			logger.debug("Scheduler: setting maxSimultaneousJobs to " + value);
-			setMaxSimultaneousJobs(TypeUtil.toInt(value));
+			setMaxSimultaneousJobs(throttleValue(value));
 		}
 		else {
 			throw new IllegalArgumentException("Unsupported property: " + name
 					+ ". Supported properties are: " + Arrays.asList(this.getPropertyNames()));
 		}
 	}
+	
+	protected int throttleValue(Object value) {
+	    if ("off".equalsIgnoreCase(value.toString())) {
+	        return THROTTLE_OFF; 
+	    }
+	    else {
+	        return TypeUtil.toInt(value);
+	    }
+	}
 
 	public Object getProperty(String name) {
 		return properties.get(name);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-08-24 02:55:34 UTC (rev 1727)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-08-28 16:38:34 UTC (rev 1728)
@@ -538,22 +538,22 @@
 	public void setProperty(String name, Object value) {
 		if (name.equalsIgnoreCase(JOBS_PER_CPU)) {
 			logger.debug("Scheduler: setting jobsPerCpu to " + value);
-			jobsPerCPU = TypeUtil.toInt(value);
+			jobsPerCPU = throttleValue(value);
 		}
 		else if (name.equalsIgnoreCase(SUBMIT_THROTTLE)) {
-			submitQueue.setThrottle(TypeUtil.toInt(value));
+			submitQueue.setThrottle(throttleValue(value));
 		}
 		else if (name.equalsIgnoreCase(HOST_SUBMIT_THROTTLE)) {
-			submitQueue.setHostThrottle(TypeUtil.toInt(value));
+			submitQueue.setHostThrottle(throttleValue(value));
 		}
 		else if (name.equalsIgnoreCase(MAX_TRANSFERS)) {
-			maxTransfers = TypeUtil.toInt(value);
+			maxTransfers = throttleValue(value);
 		}
 		else if (name.equalsIgnoreCase(SSH_INITIAL_RATE)) {
 			sshInitialRate = TypeUtil.toInt(value);
 		}
 		else if (name.equalsIgnoreCase(MAX_FILE_OPERATIONS)) {
-			maxFileOperations = TypeUtil.toInt(value);
+			maxFileOperations = throttleValue(value);
 		}
 		else {
 			super.setProperty(name, value);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-08-24 02:55:34 UTC (rev 1727)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2007-08-28 16:38:34 UTC (rev 1728)
@@ -298,7 +298,7 @@
 				}
 			}
 			else if (JOB_THROTTLE.equals(name)) {
-				jobThrottle = TypeUtil.toInt(value);
+				jobThrottle = throttleValue(value);
 			}
 			else {
 				double val = TypeUtil.toDouble(value);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2008-02-12 17:35:17
       
   | 
Revision: 1896
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1896&view=rev
Author:   hategan
Date:     2008-02-12 09:35:15 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
added method to access task constraints
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2008-02-12 17:33:40 UTC (rev 1895)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2008-02-12 17:35:15 UTC (rev 1896)
@@ -221,13 +221,13 @@
 		return combined;
 	}
 
-	protected void setConstraints(Task task, Object constraint) {
+	public void setConstraints(Task task, Object constraint) {
 		synchronized (constraints) {
 			constraints.put(task, constraint);
 		}
 	}
 
-	protected Object getConstraints(Task task) {
+	public Object getConstraints(Task task) {
 		synchronized (constraints) {
 			return constraints.get(task);
 		}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java	2008-02-12 17:33:40 UTC (rev 1895)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/Scheduler.java	2008-02-12 17:35:15 UTC (rev 1896)
@@ -102,4 +102,6 @@
 	 * Allows handling task failures at the scheduler level
 	 */
 	void addFailureHandler(FailureHandler handler);
+	
+	Object getConstraints(Task task);
 }
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2008-02-12 17:43:48
       
   | 
Revision: 1899
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1899&view=rev
Author:   hategan
Date:     2008-02-12 09:43:46 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
jobThrottle now a float; time it takes to submit a job is now in the score feedback loop. by default it is set to stabilize at 20s submission time
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java	2008-02-12 17:43:46 UTC (rev 1899)
@@ -197,12 +197,21 @@
 	
 	protected int throttleValue(Object value) {
 	    if ("off".equalsIgnoreCase(value.toString())) {
-	        return THROTTLE_OFF; 
+	        return THROTTLE_OFF;
 	    }
 	    else {
 	        return TypeUtil.toInt(value);
 	    }
 	}
+	
+	protected float floatThrottleValue(Object value) {
+	    if ("off".equalsIgnoreCase(value.toString())) {
+	        return THROTTLE_OFF;
+	    }
+	    else {
+	        return (float) TypeUtil.toDouble(value);
+	    }
+	}
 
 	public Object getProperty(String name) {
 		return properties.get(name);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2008-02-12 17:43:46 UTC (rev 1899)
@@ -20,17 +20,17 @@
 	private double tscore;
 	private int load;
 	private double delayedDelta;
-	private int jobThrottle;
+	private float jobThrottle;
 
-	public WeightedHost(BoundContact contact, int jobThrottle) {
+	public WeightedHost(BoundContact contact, float jobThrottle) {
 		this(contact, 0.0, jobThrottle);
 	}
 
-	public WeightedHost(BoundContact contact, double score, int jobThrottle) {
+	public WeightedHost(BoundContact contact, double score, float jobThrottle) {
 		this(contact, score, 0, jobThrottle);
 	}
 
-	public WeightedHost(BoundContact contact, double score, int load, int jobThrottle) {
+	public WeightedHost(BoundContact contact, double score, int load, float jobThrottle) {
 		this.host = contact;
 		setScore(score);
 		this.load = load;
@@ -129,7 +129,7 @@
 		return !(load < jobThrottle * tscore + 2);
 	}
 
-	public int getJobThrottle() {
+	public float getJobThrottle() {
 		return jobThrottle;
 	}
 }
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2008-02-12 17:43:46 UTC (rev 1899)
@@ -10,6 +10,7 @@
 package org.globus.cog.karajan.scheduler;
 
 import java.lang.reflect.Field;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
@@ -41,6 +42,7 @@
 	public static final String SCORE_HIGH_CAP = "scoreHighCap";
 	public static final String POLICY = "policy";
 	public static final String JOB_THROTTLE = "jobThrottle";
+	public static final String MAX_SUBMISSION_TIME = "maxSubmissionTime";
 
 	private WeightedHostSet sorted;
 	private int policy;
@@ -50,10 +52,12 @@
 	 */
 	private double connectionRefusedFactor, connectionTimeoutFactor, jobSubmissionTaskLoadFactor,
 			transferTaskLoadFactor, fileOperationTaskLoadFactor, successFactor, failureFactor,
-			scoreHighCap;
+			scoreHighCap, maxSubmissionTime;
 
-	private int jobThrottle;
+	private float jobThrottle;
 
+	private double submissionTimeBias, submissionTimeFactor;
+
 	private boolean change;
 	private TaskConstraints cachedConstraints;
 	private boolean cachedLoadState;
@@ -74,8 +78,24 @@
 		failureFactor = -0.5;
 		scoreHighCap = 100;
 		jobThrottle = 2;
+		maxSubmissionTime = 20;
+		updateInternal();
 	}
 
+	// This isn't very accurate since it varies by provider
+	// and with submission parallelism
+	// What it means is that if submission time is exactly this much
+	// then a success should increase the score by successFactor
+	public final double BASE_SUBMISSION_TIME = 0.5;
+
+	protected void updateInternal() {
+		if (maxSubmissionTime < BASE_SUBMISSION_TIME) {
+			throw new IllegalArgumentException("maxSubmissionTime must be > " + BASE_SUBMISSION_TIME);
+		}
+		submissionTimeFactor = -successFactor / (maxSubmissionTime - BASE_SUBMISSION_TIME);
+		submissionTimeBias = -BASE_SUBMISSION_TIME * submissionTimeFactor;
+	}
+
 	public void setResources(ContactSet grid) {
 		super.setResources(grid);
 		if (grid.getContacts() == null) {
@@ -92,7 +112,7 @@
 		sorted.add(wh);
 	}
 
-	protected synchronized void multiplyScore(WeightedHost wh, double factor) {
+	protected synchronized void factorScore(WeightedHost wh, double factor) {
 		double score = wh.getScore();
 		if (logger.isDebugEnabled()) {
 			logger.debug("multiplyScore(" + wh + ", " + factor + ")");
@@ -105,7 +125,7 @@
 		}
 	}
 
-	protected synchronized void multiplyScoreLater(WeightedHost wh, double factor) {
+	protected synchronized void factorScoreLater(WeightedHost wh, double factor) {
 		wh.setDelayedDelta(wh.getDelayedDelta() + factor);
 	}
 
@@ -186,13 +206,11 @@
 		if (logger.isDebugEnabled()) {
 			logger.debug("Next contact: " + selected);
 		}
-		
+
 		sorted.changeLoad(selected, 1);
 		selected.setDelayedDelta(successFactor);
 		return selected.getHost();
 	}
-	
-	
 
 	public synchronized void releaseContact(Contact contact) {
 		if (logger.isDebugEnabled()) {
@@ -263,7 +281,7 @@
 	private static final String[] myPropertyNames = new String[] { POLICY,
 			FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
 			FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
-			SCORE_HIGH_CAP, JOB_THROTTLE };
+			SCORE_HIGH_CAP, JOB_THROTTLE, MAX_SUBMISSION_TIME };
 	private static Set propertyNamesSet;
 
 	static {
@@ -298,7 +316,7 @@
 				}
 			}
 			else if (JOB_THROTTLE.equals(name)) {
-				jobThrottle = throttleValue(value);
+				jobThrottle = floatThrottleValue(value);
 			}
 			else {
 				double val = TypeUtil.toDouble(value);
@@ -315,6 +333,7 @@
 					throw new KarajanRuntimeException("Failed to set property '" + name + "'", e);
 				}
 			}
+			updateInternal();
 		}
 		else {
 			super.setProperty(name, value);
@@ -337,6 +356,8 @@
 				return;
 			}
 
+			checkSubmissionTime(code, e.getStatus(), t, contacts);
+
 			if (code == Status.SUBMITTED) {
 				// this isn't reliable
 				// factorSubmission(t, contacts, 1);
@@ -368,6 +389,33 @@
 		}
 	}
 
+	public static final String TASK_ATTR_SUBMISSION_TIME = "scheduler:submissionTime";
+
+	private void checkSubmissionTime(int code, Status s, Task t, Contact[] contacts) {
+		synchronized (t) {
+			if (t.getType() == Task.JOB_SUBMISSION) {
+				if (code == Status.SUBMITTING) {
+					t.setAttribute(TASK_ATTR_SUBMISSION_TIME, s.getTime());
+				}
+				else {
+					Date st = (Date) t.getAttribute(TASK_ATTR_SUBMISSION_TIME);
+					if (st != null) {
+						Date st2 = s.getTime();
+						long submissionTime = st2.getTime() - st.getTime();
+						t.setAttribute(TASK_ATTR_SUBMISSION_TIME, null);
+						double delta = submissionTimeBias + submissionTimeFactor * submissionTime
+								/ 1000;
+						if (logger.isDebugEnabled()) {
+							logger.debug("Submission time for " + t + ": " + submissionTime
+									+ "ms. Score delta: " + delta);
+						}
+						factorMultiple(contacts, delta);
+					}
+				}
+			}
+		}
+	}
+
 	private void factorSubmission(Task t, Contact[] contacts, int exp) {
 		// I wonder where the line between abstraction and obfuscation is...
 		if (t.getType() == Task.JOB_SUBMISSION) {
@@ -398,7 +446,7 @@
 			BoundContact bc = (BoundContact) contacts[i];
 			WeightedHost wh = sorted.findHost(bc);
 			if (wh != null) {
-				multiplyScore(wh, factor);
+				factorScore(wh, factor);
 			}
 		}
 	}
@@ -408,7 +456,7 @@
 			BoundContact bc = (BoundContact) contacts[i];
 			WeightedHost wh = sorted.findHost(bc);
 			if (wh != null) {
-				multiplyScoreLater(wh, factor);
+				factorScoreLater(wh, factor);
 			}
 		}
 	}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2008-05-08 03:50:24
       
   | 
Revision: 2003
          http://cogkit.svn.sourceforge.net/cogkit/?rev=2003&view=rev
Author:   hategan
Date:     2008-05-07 20:50:15 -0700 (Wed, 07 May 2008)
Log Message:
-----------
make canceling of tasks non blocking
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
Added Paths:
-----------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/NonBlockingCancel.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2008-05-08 03:49:11 UTC (rev 2002)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2008-05-08 03:50:15 UTC (rev 2003)
@@ -34,6 +34,7 @@
 import org.globus.cog.karajan.scheduler.submitQueue.GlobalSubmitQueue;
 import org.globus.cog.karajan.scheduler.submitQueue.HostSubmitQueue;
 import org.globus.cog.karajan.scheduler.submitQueue.InstanceSubmitQueue;
+import org.globus.cog.karajan.scheduler.submitQueue.NonBlockingCancel;
 import org.globus.cog.karajan.scheduler.submitQueue.NonBlockingSubmit;
 import org.globus.cog.karajan.scheduler.submitQueue.SubmitQueue;
 import org.globus.cog.karajan.util.BoundContact;
@@ -650,13 +651,7 @@
 	public void cancelTask(Task task) {
 		TaskHandler handler = getHandler(task);
 		if (handler != null) {
-			try {
-				handler.cancel(task);
-			}
-			catch (Exception e) {
-				// force it
-				task.setStatus(Status.CANCELED);
-			}
+			new NonBlockingCancel(handler, task).go();
 		}
 	}
 
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/NonBlockingCancel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/NonBlockingCancel.java	                        (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/NonBlockingCancel.java	2008-05-08 03:50:15 UTC (rev 2003)
@@ -0,0 +1,40 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on May 7, 2008
+ */
+package org.globus.cog.karajan.scheduler.submitQueue;
+
+import org.globus.cog.abstraction.interfaces.Status;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.cog.abstraction.interfaces.TaskHandler;
+
+public class NonBlockingCancel extends NonBlockingSubmit {
+
+	public NonBlockingCancel(TaskHandler th, Task task) {
+		super(th, task, null);
+	}
+	
+	public void run() {
+		try {
+			getTaskHandler().cancel(getTask());
+			notifyPreviousQueue(null);
+		}
+		catch (Exception e) {
+			//force it
+			getTask().setStatus(Status.CANCELED);
+			notifyPreviousQueue(e);
+		}
+		catch (ThreadDeath td) {
+			throw td;
+		}
+		catch (Throwable t) {
+			notifyPreviousQueue(new Exception(t));
+			t.printStackTrace();
+		}
+	}
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <b_...@us...> - 2008-06-25 22:47:59
       
   | 
Revision: 2058
          http://cogkit.svn.sourceforge.net/cogkit/?rev=2058&view=rev
Author:   b_z_c
Date:     2008-06-25 15:47:10 -0700 (Wed, 25 Jun 2008)
Log Message:
-----------
hopefully better behaviour in the presence of bad sites
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2008-06-25 13:14:02 UTC (rev 2057)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2008-06-25 22:47:10 UTC (rev 2058)
@@ -11,6 +11,7 @@
 
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
+import org.apache.log4j.Logger;
 
 import org.globus.cog.karajan.util.BoundContact;
 
@@ -18,12 +19,15 @@
 
 	static final int MINWEIGHT = -10;
 
+	private static final Logger logger = Logger.getLogger(WeightedHost.class);
+
 	private BoundContact host;
 	private Double score;
 	private double tscore;
 	private int load;
 	private double delayedDelta;
 	private float jobThrottle;
+	private long lastUsed;
 
 	public WeightedHost(BoundContact contact, float jobThrottle) {
 		this(contact, 0.0, jobThrottle);
@@ -63,7 +67,8 @@
 	}
 
 	public final double getTScore() {
-		return tscore;
+		if(tscore >= 1) return tscore;
+		if(isOverloaded()) return 0; else return 1;
 	}
 
 	public final BoundContact getHost() {
@@ -130,7 +135,25 @@
 	}
 
 	public boolean isOverloaded() {
-		return !(load < maxLoad());
+		double ml = maxLoad();
+		if(tscore >= 1) {
+			// the site is mostly good. permit 1 or more jobs
+			// always.
+			logger.info("In load mode. score = "+score+" tscore = "+tscore+", maxload="+ml);
+			return !(load <= ml);
+		} else {
+			// the site is mostly bad. allow either 1 or 0 jobs
+			// based on time.
+			long now = System.currentTimeMillis();
+			long delay = now - lastUsed;
+			long permittedDelay = (long)(Math.exp(-(score.doubleValue()))*100.0);
+			boolean overloaded=(delay<permittedDelay);
+			// tscore of -1 will give delay of around 
+			// 200ms, and will double every time tscore goes
+			// down by one (which is once per failed job? roughly?)
+			logger.info("In delay mode. score = "+score+" tscore = "+tscore+", maxload="+ml+" delay since last used="+delay+"ms"+" permitted delay="+permittedDelay+"ms overloaded="+overloaded);
+			return overloaded;
+		}
 	}
 
 	public float getJobThrottle() {
@@ -138,6 +161,10 @@
 	}
 
 	public double maxLoad() {
-		return jobThrottle * tscore + 2;
+		return jobThrottle * tscore + 1;
 	}
+
+	public void notifyUsed() {
+		lastUsed = System.currentTimeMillis();
+	}
 }
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2008-06-25 13:14:02 UTC (rev 2057)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2008-06-25 22:47:10 UTC (rev 2058)
@@ -223,6 +223,7 @@
 
 		sorted.changeLoad(selected, 1);
 		selected.setDelayedDelta(successFactor);
+		selected.notifyUsed();
 		return selected.getHost();
 	}
 
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <b_...@us...> - 2008-06-30 16:08:19
       
   | 
Revision: 2065
          http://cogkit.svn.sourceforge.net/cogkit/?rev=2065&view=rev
Author:   b_z_c
Date:     2008-06-30 09:08:01 -0700 (Mon, 30 Jun 2008)
Log Message:
-----------
overload-chk2 patch from Mihael; fixes a problem introduced in r2058
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2008-06-30 15:05:39 UTC (rev 2064)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java	2008-06-30 16:08:01 UTC (rev 2065)
@@ -11,8 +11,9 @@
 
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
+import java.util.Timer;
+
 import org.apache.log4j.Logger;
-
 import org.globus.cog.karajan.util.BoundContact;
 
 public class WeightedHost implements Comparable {
@@ -70,7 +71,7 @@
 	public final double getTScore() {
 		if (tscore >= 1)
 			return tscore;
-		if (isOverloaded())
+		if (isOverloaded() != 0)
 			return 0;
 		else
 			return 1;
@@ -116,7 +117,7 @@
 
 	public String toString() {
 		return host.toString() + ":" + D4.format(score) + "(" + D4.format(tscore) + "):" + load
-				+ "/" + (int) (maxLoad()) + (isOverloaded() ? " overloaded" : "");
+				+ "/" + (int) (maxLoad()) + " overload: " + isOverloaded();
 	}
 
 	public int compareTo(Object o) {
@@ -139,7 +140,7 @@
 		this.delayedDelta = delayedDelta;
 	}
 
-	public boolean isOverloaded() {
+	public int isOverloaded() {
 		double ml = maxLoad();
 		if (tscore >= 1) {
 			// the site is mostly good. permit 1 or more jobs
@@ -148,7 +149,7 @@
 				logger.debug("In load mode. score = " + score + " tscore = " + tscore + ", maxload="
 						+ ml);
 			}
-			return !(load <= ml);
+			return load <= ml ? 0 : 1;
 		}
 		else {
 			// the site is mostly bad. allow either 1 or 0 jobs
@@ -165,7 +166,7 @@
 						+ ", maxload=" + ml + " delay since last used=" + delay + "ms"
 						+ " permitted delay=" + permittedDelay + "ms overloaded=" + overloaded);
 			}
-			return overloaded;
+			return (int) (delay - permittedDelay);
 		}
 	}
 
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2008-06-30 15:05:39 UTC (rev 2064)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java	2008-06-30 16:08:01 UTC (rev 2065)
@@ -274,7 +274,7 @@
 			Iterator i = s.iterator();
 			while (i.hasNext()) {
 				WeightedHost wh = (WeightedHost) i.next();
-				if (!wh.isOverloaded()) {
+				if (wh.isOverloaded() == 0) {
 					ns.add(wh);
 				}
 			}
@@ -284,7 +284,7 @@
 			Iterator i = s.iterator();
 			while (i.hasNext()) {
 				WeightedHost wh = (WeightedHost) i.next();
-				if (wh.isOverloaded()) {
+				if (wh.isOverloaded() != 0) {
 					i.remove();
 				}
 			}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java	2008-06-30 15:05:39 UTC (rev 2064)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostSet.java	2008-06-30 16:08:01 UTC (rev 2065)
@@ -12,6 +12,8 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.TreeSet;
 
 import org.globus.cog.karajan.util.BoundContact;
@@ -21,8 +23,10 @@
 	private Map weightedHosts;
 	private double sum;
 	private double scoreHighCap;
-	private int overloadedCount;
+	private volatile int overloadedCount;
 
+	private static final Timer timer = new Timer();
+
 	public WeightedHostSet(double scoreHighCap) {
 		init();
 		this.scoreHighCap = scoreHighCap;
@@ -38,34 +42,54 @@
 		scores.add(wh);
 		weightedHosts.put(wh.getHost(), wh);
 		sum += wh.getTScore();
-		overloadedCount += wh.isOverloaded() ? 1 : 0;
+		overloadedCount += checkOverloaded(wh, 1);
 	}
 
 	public void changeScore(WeightedHost wh, double newScore) {
 		scores.remove(wh);
 		sum -= wh.getTScore();
-		overloadedCount -= wh.isOverloaded() ? 1 : 0;
+		overloadedCount += checkOverloaded(wh, -1);
 		wh.setScore(newScore);
 		weightedHosts.put(wh.getHost(), wh);
 		scores.add(wh);
 		sum += wh.getTScore();
-		overloadedCount += wh.isOverloaded() ? 1 : 0;
+		overloadedCount += checkOverloaded(wh, 1);
 	}
 
 	public void changeLoad(WeightedHost wh, int dl) {
-		overloadedCount -= wh.isOverloaded() ? 1 : 0;
+		overloadedCount -= checkOverloaded(wh, -1);
 		wh.changeLoad(dl);
-		overloadedCount += wh.isOverloaded() ? 1 : 0;
+		overloadedCount += checkOverloaded(wh, 1);
 	}
 
 	public double remove(WeightedHost wh) {
 		scores.remove(wh);
 		weightedHosts.remove(wh.getHost());
 		sum -= wh.getScore();
-		overloadedCount -= wh.isOverloaded() ? 1 : 0;
+		overloadedCount += checkOverloaded(wh, -1);
 		return wh.getScore();
 	}
 
+	private int checkOverloaded(WeightedHost wh, final int dir) {
+		int v = wh.isOverloaded();
+		if (v >= 0) {
+		    if (dir > 0) {
+		    	return v;
+		    }
+		    else {
+		        return -v;
+		    }
+		}
+		else {
+			timer.schedule(new TimerTask() {
+				public void run() {
+				    overloadedCount -= dir;
+				}
+			}, -v);
+			return dir;
+		}
+	}
+
 	public WeightedHost findHost(BoundContact bc) {
 		return (WeightedHost) weightedHosts.get(bc);
 	}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 |