|
From: <ha...@us...> - 2006-11-22 20:37:32
|
Revision: 1414
http://svn.sourceforge.net/cogkit/?rev=1414&view=rev
Author: hategan
Date: 2006-11-22 12:37:31 -0800 (Wed, 22 Nov 2006)
Log Message:
-----------
fixed properties handling and other things
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-22 20:36:56 UTC (rev 1413)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-22 20:37:31 UTC (rev 1414)
@@ -9,12 +9,15 @@
*/
package org.globus.cog.karajan.scheduler;
+import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.StatusEvent;
+import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
+import org.globus.cog.abstraction.interfaces.Service;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.karajan.util.BoundContact;
@@ -45,32 +48,32 @@
private int policy;
private int delay;
+ /*
+ * These field names must match the property names
+ */
+ private double connectionRefusedFactor, connectionTimeoutFactor, jobSubmissionTaskLoadFactor,
+ transferTaskLoadFactor, fileOperationTaskLoadFactor, successFactor, failureFactor,
+ scoreHighCap, scoreLowCap;
+ private int normalizationDelay;
+
public WeightedHostScoreScheduler() {
policy = POLICY_WEIGHTED_RANDOM;
setDefaultFactors();
}
protected final void setDefaultFactors() {
- setFactor(FACTOR_CONNECTION_REFUSED, 0.1);
- setFactor(FACTOR_CONNECTION_TIMEOUT, 0.05);
- setFactor(FACTOR_SUBMISSION_TASK_LOAD, 0.9);
- setFactor(FACTOR_TRANSFER_TASK_LOAD, 0.9);
- setFactor(FACTOR_FILEOP_TASK_LOAD, 0.95);
- setFactor(FACTOR_SUCCESS, 1.2);
- setFactor(FACTOR_FAILURE, 0.9);
- setFactor(SCORE_HIGH_CAP, 100);
- setFactor(SCORE_LOW_CAP, 0.001);
- setFactor(NORMALIZATION_DELAY, 100);
+ connectionRefusedFactor = 0.1;
+ connectionTimeoutFactor = 0.05;
+ jobSubmissionTaskLoadFactor = 0.9;
+ transferTaskLoadFactor = 0.9;
+ fileOperationTaskLoadFactor = 0.95;
+ successFactor = 1.01;
+ failureFactor = 0.9;
+ scoreHighCap = 100;
+ scoreLowCap = 0.01;
+ normalizationDelay = 100;
}
- protected final void setFactor(String name, double value) {
- setProperty(name, new Double(value));
- }
-
- protected double getFactor(String name) {
- return ((Double) getProperty(name)).doubleValue();
- }
-
public void setResources(ContactSet grid) {
super.setResources(grid);
sorted = new WeightedHostSet();
@@ -97,11 +100,11 @@
}
protected double checkCaps(double score) {
- if (score > getFactor(SCORE_HIGH_CAP)) {
- return getFactor(SCORE_HIGH_CAP);
+ if (score > scoreHighCap) {
+ return scoreHighCap;
}
- else if (score < getFactor(SCORE_LOW_CAP)) {
- return getFactor(SCORE_LOW_CAP);
+ else if (score < scoreLowCap) {
+ return scoreLowCap;
}
else {
return score;
@@ -112,11 +115,11 @@
throws NoFreeResourceException {
checkGlobalLoadConditions();
BoundContact contact;
-
+
WeightedHostSet s = sorted;
-
+
s = constrain(s, getConstraintChecker(), t);
-
+
double sum = s.getSum();
if (policy == POLICY_WEIGHTED_RANDOM) {
double rand = Math.random() * sum;
@@ -125,7 +128,7 @@
logger.debug("Rand: " + rand + ", sum: " + sum);
}
Iterator i = s.iterator();
-
+
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
sum += wh.getScore();
@@ -149,7 +152,8 @@
return contact;
}
- protected WeightedHostSet constrain(WeightedHostSet s, ResourceConstraintChecker rcc, TaskConstraints tc) {
+ protected WeightedHostSet constrain(WeightedHostSet s, ResourceConstraintChecker rcc,
+ TaskConstraints tc) {
if (rcc == null) {
return s;
}
@@ -168,7 +172,7 @@
protected void normalize() {
delay++;
- if (delay > getFactor(NORMALIZATION_DELAY)) {
+ if (delay > normalizationDelay) {
if (logger.isDebugEnabled()) {
logger.debug("Normalizing...");
logger.debug("Before normalization: " + sorted);
@@ -220,57 +224,87 @@
}
}
else {
- super.setProperty(name, new Double(TypeUtil.toDouble(value)));
+ double val = TypeUtil.toDouble(value);
+ try {
+ Field f = this.getClass().getField(name);
+ if (f.getClass().equals(int.class)) {
+ f.setInt(this, (int) val);
+ }
+ else {
+ f.setDouble(this, val);
+ }
+ }
+ catch (Exception e) {
+ throw new KarajanRuntimeException("Failed to set property '" + name + "'", e);
+ }
}
}
+ else {
+ super.setProperty(name, value);
+ }
}
+ public void submitBoundToServices(Task t, Contact[] contacts, Service[] services)
+ throws TaskSubmissionException {
+ factorSubmission(t, contacts, 1);
+ super.submitBoundToServices(t, contacts, services);
+ }
+
public void statusChanged(StatusEvent e) {
- Task t = (Task) e.getSource();
- int code = e.getStatus().getStatusCode();
- Contact[] contacts = getContacts(t);
- if (code == Status.SUBMITTED) {
- factorSubmission(t, contacts, 1);
- }
- else if (code == Status.COMPLETED) {
- factorSubmission(t, contacts, -1);
- factorMultiple(contacts, getFactor(FACTOR_SUCCESS));
- }
- else if (code == Status.FAILED) {
- factorMultiple(contacts, getFactor(FACTOR_FAILURE));
- Exception ex = e.getStatus().getException();
- if (ex != null) {
- String exs = ex.toString();
- if (exs.indexOf("Connection refused") >= 0 || exs.indexOf("connection refused") >= 0) {
- factorMultiple(contacts, getFactor(FACTOR_CONNECTION_REFUSED));
+ try {
+ Task t = (Task) e.getSource();
+ int code = e.getStatus().getStatusCode();
+ Contact[] contacts = getContacts(t);
+ if (code == Status.SUBMITTED) {
+ // this isn't reliable
+ // factorSubmission(t, contacts, 1);
+ }
+ else if (code == Status.COMPLETED) {
+ factorSubmission(t, contacts, -1);
+ factorMultiple(contacts, successFactor);
+ }
+ else if (code == Status.FAILED) {
+ factorMultiple(contacts, failureFactor);
+ Exception ex = e.getStatus().getException();
+ if (ex != null) {
+ String exs = ex.toString();
+ if (exs.indexOf("Connection refused") >= 0
+ || exs.indexOf("connection refused") >= 0) {
+ factorMultiple(contacts, connectionRefusedFactor);
+ }
+ else if (exs.indexOf("timeout") >= 0) {
+ factorMultiple(contacts, connectionTimeoutFactor);
+ }
}
- else if (exs.indexOf("timeout") >= 0) {
- factorMultiple(contacts, getFactor(FACTOR_CONNECTION_TIMEOUT));
- }
}
}
- super.statusChanged(e);
+ catch (Exception ex) {
+ logger.warn("Scheduler threw exception while processing task status change", ex);
+ }
+ finally {
+ super.statusChanged(e);
+ }
}
private void factorSubmission(Task t, Contact[] contacts, int exp) {
// I wonder where the line between abstraction and obfuscation is...
if (t.getType() == Task.JOB_SUBMISSION) {
- factorMultiple(contacts, spow(getFactor(FACTOR_SUBMISSION_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(jobSubmissionTaskLoadFactor, exp));
}
else if (t.getType() == Task.FILE_TRANSFER) {
- factorMultiple(contacts, spow(getFactor(FACTOR_TRANSFER_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(transferTaskLoadFactor, exp));
}
else if (t.getType() == Task.FILE_OPERATION) {
- factorMultiple(contacts, spow(getFactor(FACTOR_FILEOP_TASK_LOAD), exp));
+ factorMultiple(contacts, spow(fileOperationTaskLoadFactor, exp));
}
}
-
+
private double spow(double x, int exp) {
if (exp == 1) {
return x;
}
else if (exp == -1) {
- return 1/x;
+ return 1 / x;
}
else {
throw new IllegalArgumentException();
@@ -279,7 +313,7 @@
private void factorMultiple(Contact[] contacts, double factor) {
for (int i = 0; i < contacts.length; i++) {
- WeightedHost wh = new WeightedHost((BoundContact) contacts[i]);
+ WeightedHost wh = sorted.findHost((BoundContact) contacts[i]);
multiplyScore(wh, factor);
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2006-11-22 21:45:28
|
Revision: 1419
http://svn.sourceforge.net/cogkit/?rev=1419&view=rev
Author: hategan
Date: 2006-11-22 13:45:19 -0800 (Wed, 22 Nov 2006)
Log Message:
-----------
not all contacts may be known to the scheduler
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-22 20:57:38 UTC (rev 1418)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-22 21:45:19 UTC (rev 1419)
@@ -313,9 +313,11 @@
private void factorMultiple(Contact[] contacts, double factor) {
for (int i = 0; i < contacts.length; i++) {
- WeightedHost wh = sorted.findHost((BoundContact) contacts[i]);
- multiplyScore(wh, factor);
+ BoundContact bc = (BoundContact) contacts[i];
+ WeightedHost wh = sorted.findHost(bc);
+ if (wh != null) {
+ multiplyScore(wh, factor);
+ }
}
}
-
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2006-11-22 23:00:05
|
Revision: 1420
http://svn.sourceforge.net/cogkit/?rev=1420&view=rev
Author: hategan
Date: 2006-11-22 14:59:59 -0800 (Wed, 22 Nov 2006)
Log Message:
-----------
oops
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-22 21:45:19 UTC (rev 1419)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-11-22 22:59:59 UTC (rev 1420)
@@ -128,7 +128,8 @@
logger.debug("Rand: " + rand + ", sum: " + sum);
}
Iterator i = s.iterator();
-
+
+ sum = 0;
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
sum += wh.getScore();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2006-12-07 18:15:39
|
Revision: 1451
http://svn.sourceforge.net/cogkit/?rev=1451&view=rev
Author: hategan
Date: 2006-12-07 10:15:36 -0800 (Thu, 07 Dec 2006)
Log Message:
-----------
what on earth was I thinking?
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-12-07 17:28:14 UTC (rev 1450)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2006-12-07 18:15:36 UTC (rev 1451)
@@ -217,7 +217,7 @@
return propertyNames;
}
- public final void setProperty(String name, Object value) {
+ public void setProperty(String name, Object value) {
if (propertyNamesSet.contains(name)) {
if (POLICY.equals(name)) {
if (value instanceof String) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-01-06 20:53:20
|
Revision: 1533
http://svn.sourceforge.net/cogkit/?rev=1533&view=rev
Author: hategan
Date: 2007-01-06 12:53:17 -0800 (Sat, 06 Jan 2007)
Log Message:
-----------
tweaked factors; added overload caching
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-01-06 20:52:19 UTC (rev 1532)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-01-06 20:53:17 UTC (rev 1533)
@@ -54,6 +54,11 @@
private int jobThrottle;
+ private boolean change;
+ private TaskConstraints cachedConstraints;
+ private boolean cachedLoadState;
+ private int hits;
+
public WeightedHostScoreScheduler() {
policy = POLICY_WEIGHTED_RANDOM;
setDefaultFactors();
@@ -65,8 +70,8 @@
jobSubmissionTaskLoadFactor = -0.2;
transferTaskLoadFactor = -0.2;
fileOperationTaskLoadFactor = -0.01;
- successFactor = 1;
- failureFactor = -0.1;
+ successFactor = 0.1;
+ failureFactor = -0.5;
scoreHighCap = 100;
jobThrottle = 2;
}
@@ -76,7 +81,7 @@
sorted = new WeightedHostSet(scoreHighCap);
Iterator i = grid.getContacts().iterator();
while (i.hasNext()) {
- addToSorted(new WeightedHost((BoundContact) i.next()));
+ addToSorted(new WeightedHost((BoundContact) i.next(), jobThrottle));
}
}
@@ -92,7 +97,8 @@
double ns = factor(score, factor);
sorted.changeScore(wh, ns);
if (logger.isDebugEnabled()) {
- logger.debug("Old score: " + score + ", new score: " + ns);
+ logger.debug("Old score: " + WeightedHost.D4.format(score) + ", new score: "
+ + WeightedHost.D4.format(ns));
}
}
@@ -107,16 +113,36 @@
protected synchronized BoundContact getNextContact(TaskConstraints t)
throws NoFreeResourceException {
checkGlobalLoadConditions();
+
+ if (!change && cachedLoadState && cachedConstraints.equals(t)) {
+ hits++;
+ throw new NoFreeResourceException();
+ }
+
BoundContact contact;
WeightedHostSet s = sorted;
WeightedHost selected = null;
+ if (s.allOverloaded()) {
+ throw new NoFreeResourceException();
+ }
+
s = constrain(s, getConstraintChecker(), t);
if (s.isEmpty()) {
throw new NoSuchResourceException();
}
+ else if (s.allOverloaded()) {
+ change = false;
+ cachedLoadState = true;
+ cachedConstraints = t;
+ hits = 0;
+ throw new NoFreeResourceException();
+ }
+ else {
+ cachedLoadState = false;
+ }
s = removeOverloaded(s);
@@ -157,7 +183,7 @@
if (logger.isDebugEnabled()) {
logger.debug("Next contact: " + selected.getHost());
}
- selected.changeLoad(1);
+ sorted.changeLoad(selected, 1);
selected.setDelayedDelta(successFactor);
return selected.getHost();
}
@@ -169,7 +195,8 @@
super.releaseContact(contact);
WeightedHost wh = sorted.findHost(contact);
if (wh != null) {
- wh.changeLoad(-1);
+ change = true;
+ sorted.changeLoad(wh, -1);
sorted.changeScore(wh, wh.getScore() + wh.getDelayedDelta());
}
else {
@@ -201,7 +228,7 @@
Iterator i = s.iterator();
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
- if (!overloaded(wh)) {
+ if (!wh.isOverloaded()) {
ns.add(wh);
}
}
@@ -211,7 +238,7 @@
Iterator i = s.iterator();
while (i.hasNext()) {
WeightedHost wh = (WeightedHost) i.next();
- if (overloaded(wh)) {
+ if (wh.isOverloaded()) {
i.remove();
}
}
@@ -219,12 +246,6 @@
}
}
- protected boolean overloaded(WeightedHost wh) {
- double score = wh.getTScore();
- int load = wh.getLoad();
- return !(load < jobThrottle * score + 2);
- }
-
private static String[] propertyNames;
private static final String[] myPropertyNames = new String[] { POLICY,
FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
@@ -298,11 +319,11 @@
Task t = (Task) e.getSource();
int code = e.getStatus().getStatusCode();
Contact[] contacts = getContacts(t);
-
+
if (contacts == null) {
return;
}
-
+
if (code == Status.SUBMITTED) {
// this isn't reliable
// factorSubmission(t, contacts, 1);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2007-02-13 18:19:19
|
Revision: 1580
http://svn.sourceforge.net/cogkit/?rev=1580&view=rev
Author: hategan
Date: 2007-02-13 10:19:18 -0800 (Tue, 13 Feb 2007)
Log Message:
-----------
fixed NPE when no resources are specified
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-02-13 01:09:16 UTC (rev 1579)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2007-02-13 18:19:18 UTC (rev 1580)
@@ -78,6 +78,9 @@
public void setResources(ContactSet grid) {
super.setResources(grid);
+ if (grid.getContacts() == null) {
+ return;
+ }
sorted = new WeightedHostSet(scoreHighCap);
Iterator i = grid.getContacts().iterator();
while (i.hasNext()) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-05-26 17:28:50
|
Revision: 2027
http://cogkit.svn.sourceforge.net/cogkit/?rev=2027&view=rev
Author: hategan
Date: 2008-05-26 10:28:46 -0700 (Mon, 26 May 2008)
Log Message:
-----------
fixed setting properties from subclasses
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-05-23 18:41:17 UTC (rev 2026)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-05-26 17:28:46 UTC (rev 2027)
@@ -335,7 +335,7 @@
else {
double val = TypeUtil.toDouble(value);
try {
- Field f = this.getClass().getField(name);
+ Field f = WeightedHostScoreScheduler.class.getDeclaredField(name);
if (f.getClass().equals(int.class)) {
f.setInt(this, (int) val);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|