|
From: <ha...@us...> - 2007-09-13 21:18:34
|
Revision: 1738
http://cogkit.svn.sourceforge.net/cogkit/?rev=1738&view=rev
Author: hategan
Date: 2007-09-13 14:18:23 -0700 (Thu, 13 Sep 2007)
Log Message:
-----------
added a fixed rate queue
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/InstanceSubmitQueue.java
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/FixedRateQueue.java
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/FixedRateQueue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/FixedRateQueue.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/FixedRateQueue.java 2007-09-13 21:18:23 UTC (rev 1738)
@@ -0,0 +1,59 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2006
+ */
+package org.globus.cog.karajan.scheduler.submitQueue;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public class FixedRateQueue extends AbstractSubmitQueue {
+ private long lastSubmit, delay;
+ private static Timer timer;
+
+
+ public FixedRateQueue(long delay) {
+ super(Math.max(1, (int) (delay/1000)));
+ this.delay = delay;
+ }
+
+ public FixedRateQueue(double rate) {
+ super(Math.max(1, (int) rate));
+ this.delay = (long) (1000 / rate);
+ }
+
+ private static synchronized Timer getTimer() {
+ if (timer == null) {
+ timer = new Timer(true);
+ }
+ return timer;
+ }
+
+ protected void step() {
+ NonBlockingSubmit nbs = null;
+ synchronized (this) {
+ if (!isQueueEmpty()) {
+ long time = System.currentTimeMillis();
+ if (time - lastSubmit > delay) {
+ nbs = poll();
+ lastSubmit = time;
+ }
+ else {
+ getTimer().schedule(new TimerTask() {
+ public void run() {
+ step();
+ }
+ }, delay - time + lastSubmit);
+ }
+ }
+ }
+ if (nbs != null) {
+ nbs.nextQueue();
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java 2007-09-10 15:46:32 UTC (rev 1737)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java 2007-09-13 21:18:23 UTC (rev 1738)
@@ -12,11 +12,16 @@
import java.util.HashMap;
import java.util.Map;
+import org.globus.cog.karajan.util.BoundContact;
+
public class HostSubmitQueue extends AbstractSubmitQueue {
private Map providerQueues;
+ private BoundContact contact;
+ private FixedRateQueue rlq;
- public HostSubmitQueue(int throttle) {
+ public HostSubmitQueue(BoundContact contact, int throttle) {
super(throttle);
+ this.contact = contact;
}
private static final NullQueue NULL_QUEUE = new NullQueue();
@@ -33,8 +38,13 @@
sq = new RateLimiterQueue(initialRate, maxRetries, errorRegexp);
providerQueues.put(provider, sq);
}
-
}
+ else if (contact.getProperty("maxSubmitRate") != null) {
+ if (rlq == null) {
+ rlq = new FixedRateQueue(Double.parseDouble(contact.getProperty("maxSubmitRate").toString()));
+ }
+ sq = rlq;
+ }
else {
sq = NULL_QUEUE;
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/InstanceSubmitQueue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/InstanceSubmitQueue.java 2007-09-10 15:46:32 UTC (rev 1737)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/InstanceSubmitQueue.java 2007-09-13 21:18:23 UTC (rev 1738)
@@ -27,7 +27,7 @@
synchronized (queues) {
HostSubmitQueue hq = (HostSubmitQueue) queues.get(contact);
if (hq == null) {
- hq = new HostSubmitQueue(hostThrottle);
+ hq = new HostSubmitQueue(contact, hostThrottle);
queues.put(contact, hq);
}
return hq;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|