|
From: <ha...@us...> - 2008-02-12 17:43:48
|
Revision: 1899
http://cogkit.svn.sourceforge.net/cogkit/?rev=1899&view=rev
Author: hategan
Date: 2008-02-12 09:43:46 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
jobThrottle now a float; time it takes to submit a job is now in the score feedback loop. by default it is set to stabilize at 20s submission time
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2008-02-12 17:43:46 UTC (rev 1899)
@@ -197,12 +197,21 @@
protected int throttleValue(Object value) {
if ("off".equalsIgnoreCase(value.toString())) {
- return THROTTLE_OFF;
+ return THROTTLE_OFF;
}
else {
return TypeUtil.toInt(value);
}
}
+
+ protected float floatThrottleValue(Object value) {
+ if ("off".equalsIgnoreCase(value.toString())) {
+ return THROTTLE_OFF;
+ }
+ else {
+ return (float) TypeUtil.toDouble(value);
+ }
+ }
public Object getProperty(String name) {
return properties.get(name);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHost.java 2008-02-12 17:43:46 UTC (rev 1899)
@@ -20,17 +20,17 @@
private double tscore;
private int load;
private double delayedDelta;
- private int jobThrottle;
+ private float jobThrottle;
- public WeightedHost(BoundContact contact, int jobThrottle) {
+ public WeightedHost(BoundContact contact, float jobThrottle) {
this(contact, 0.0, jobThrottle);
}
- public WeightedHost(BoundContact contact, double score, int jobThrottle) {
+ public WeightedHost(BoundContact contact, double score, float jobThrottle) {
this(contact, score, 0, jobThrottle);
}
- public WeightedHost(BoundContact contact, double score, int load, int jobThrottle) {
+ public WeightedHost(BoundContact contact, double score, int load, float jobThrottle) {
this.host = contact;
setScore(score);
this.load = load;
@@ -129,7 +129,7 @@
return !(load < jobThrottle * tscore + 2);
}
- public int getJobThrottle() {
+ public float getJobThrottle() {
return jobThrottle;
}
}
\ No newline at end of file
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-02-12 17:36:49 UTC (rev 1898)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/WeightedHostScoreScheduler.java 2008-02-12 17:43:46 UTC (rev 1899)
@@ -10,6 +10,7 @@
package org.globus.cog.karajan.scheduler;
import java.lang.reflect.Field;
+import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@@ -41,6 +42,7 @@
public static final String SCORE_HIGH_CAP = "scoreHighCap";
public static final String POLICY = "policy";
public static final String JOB_THROTTLE = "jobThrottle";
+ public static final String MAX_SUBMISSION_TIME = "maxSubmissionTime";
private WeightedHostSet sorted;
private int policy;
@@ -50,10 +52,12 @@
*/
private double connectionRefusedFactor, connectionTimeoutFactor, jobSubmissionTaskLoadFactor,
transferTaskLoadFactor, fileOperationTaskLoadFactor, successFactor, failureFactor,
- scoreHighCap;
+ scoreHighCap, maxSubmissionTime;
- private int jobThrottle;
+ private float jobThrottle;
+ private double submissionTimeBias, submissionTimeFactor;
+
private boolean change;
private TaskConstraints cachedConstraints;
private boolean cachedLoadState;
@@ -74,8 +78,24 @@
failureFactor = -0.5;
scoreHighCap = 100;
jobThrottle = 2;
+ maxSubmissionTime = 20;
+ updateInternal();
}
+ // This isn't very accurate since it varies by provider
+ // and with submission parallelism
+ // What it means is that if submission time is exactly this much
+ // then a success should increase the score by successFactor
+ public final double BASE_SUBMISSION_TIME = 0.5;
+
+ protected void updateInternal() {
+ if (maxSubmissionTime < BASE_SUBMISSION_TIME) {
+ throw new IllegalArgumentException("maxSubmissionTime must be > " + BASE_SUBMISSION_TIME);
+ }
+ submissionTimeFactor = -successFactor / (maxSubmissionTime - BASE_SUBMISSION_TIME);
+ submissionTimeBias = -BASE_SUBMISSION_TIME * submissionTimeFactor;
+ }
+
public void setResources(ContactSet grid) {
super.setResources(grid);
if (grid.getContacts() == null) {
@@ -92,7 +112,7 @@
sorted.add(wh);
}
- protected synchronized void multiplyScore(WeightedHost wh, double factor) {
+ protected synchronized void factorScore(WeightedHost wh, double factor) {
double score = wh.getScore();
if (logger.isDebugEnabled()) {
logger.debug("multiplyScore(" + wh + ", " + factor + ")");
@@ -105,7 +125,7 @@
}
}
- protected synchronized void multiplyScoreLater(WeightedHost wh, double factor) {
+ protected synchronized void factorScoreLater(WeightedHost wh, double factor) {
wh.setDelayedDelta(wh.getDelayedDelta() + factor);
}
@@ -186,13 +206,11 @@
if (logger.isDebugEnabled()) {
logger.debug("Next contact: " + selected);
}
-
+
sorted.changeLoad(selected, 1);
selected.setDelayedDelta(successFactor);
return selected.getHost();
}
-
-
public synchronized void releaseContact(Contact contact) {
if (logger.isDebugEnabled()) {
@@ -263,7 +281,7 @@
private static final String[] myPropertyNames = new String[] { POLICY,
FACTOR_CONNECTION_REFUSED, FACTOR_CONNECTION_TIMEOUT, FACTOR_SUBMISSION_TASK_LOAD,
FACTOR_TRANSFER_TASK_LOAD, FACTOR_FILEOP_TASK_LOAD, FACTOR_FAILURE, FACTOR_SUCCESS,
- SCORE_HIGH_CAP, JOB_THROTTLE };
+ SCORE_HIGH_CAP, JOB_THROTTLE, MAX_SUBMISSION_TIME };
private static Set propertyNamesSet;
static {
@@ -298,7 +316,7 @@
}
}
else if (JOB_THROTTLE.equals(name)) {
- jobThrottle = throttleValue(value);
+ jobThrottle = floatThrottleValue(value);
}
else {
double val = TypeUtil.toDouble(value);
@@ -315,6 +333,7 @@
throw new KarajanRuntimeException("Failed to set property '" + name + "'", e);
}
}
+ updateInternal();
}
else {
super.setProperty(name, value);
@@ -337,6 +356,8 @@
return;
}
+ checkSubmissionTime(code, e.getStatus(), t, contacts);
+
if (code == Status.SUBMITTED) {
// this isn't reliable
// factorSubmission(t, contacts, 1);
@@ -368,6 +389,33 @@
}
}
+ public static final String TASK_ATTR_SUBMISSION_TIME = "scheduler:submissionTime";
+
+ private void checkSubmissionTime(int code, Status s, Task t, Contact[] contacts) {
+ synchronized (t) {
+ if (t.getType() == Task.JOB_SUBMISSION) {
+ if (code == Status.SUBMITTING) {
+ t.setAttribute(TASK_ATTR_SUBMISSION_TIME, s.getTime());
+ }
+ else {
+ Date st = (Date) t.getAttribute(TASK_ATTR_SUBMISSION_TIME);
+ if (st != null) {
+ Date st2 = s.getTime();
+ long submissionTime = st2.getTime() - st.getTime();
+ t.setAttribute(TASK_ATTR_SUBMISSION_TIME, null);
+ double delta = submissionTimeBias + submissionTimeFactor * submissionTime
+ / 1000;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Submission time for " + t + ": " + submissionTime
+ + "ms. Score delta: " + delta);
+ }
+ factorMultiple(contacts, delta);
+ }
+ }
+ }
+ }
+ }
+
private void factorSubmission(Task t, Contact[] contacts, int exp) {
// I wonder where the line between abstraction and obfuscation is...
if (t.getType() == Task.JOB_SUBMISSION) {
@@ -398,7 +446,7 @@
BoundContact bc = (BoundContact) contacts[i];
WeightedHost wh = sorted.findHost(bc);
if (wh != null) {
- multiplyScore(wh, factor);
+ factorScore(wh, factor);
}
}
}
@@ -408,7 +456,7 @@
BoundContact bc = (BoundContact) contacts[i];
WeightedHost wh = sorted.findHost(bc);
if (wh != null) {
- multiplyScoreLater(wh, factor);
+ factorScoreLater(wh, factor);
}
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|