| 
     
      
      
      From: <ha...@us...> - 2007-09-13 21:18:34
       
   | 
Revision: 1738
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1738&view=rev
Author:   hategan
Date:     2007-09-13 14:18:23 -0700 (Thu, 13 Sep 2007)
Log Message:
-----------
added a fixed rate queue
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/InstanceSubmitQueue.java
Added Paths:
-----------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/FixedRateQueue.java
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/FixedRateQueue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/FixedRateQueue.java	                        (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/FixedRateQueue.java	2007-09-13 21:18:23 UTC (rev 1738)
@@ -0,0 +1,59 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 20, 2006
+ */
+package org.globus.cog.karajan.scheduler.submitQueue;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public class FixedRateQueue extends AbstractSubmitQueue {
+	private long lastSubmit, delay;
+	private static Timer timer;
+
+
+	public FixedRateQueue(long delay) {
+		super(Math.max(1, (int) (delay/1000)));
+		this.delay = delay;
+	}
+	
+	public FixedRateQueue(double rate) {
+		super(Math.max(1, (int) rate));
+		this.delay = (long) (1000 / rate);
+	}
+
+	private static synchronized Timer getTimer() {
+		if (timer == null) {
+			timer = new Timer(true);
+		}
+		return timer;
+	}
+
+	protected void step() {
+		NonBlockingSubmit nbs = null;
+		synchronized (this) {
+			if (!isQueueEmpty()) {
+				long time = System.currentTimeMillis();
+				if (time - lastSubmit > delay) {
+					nbs = poll();
+					lastSubmit = time;
+				}
+				else {
+					getTimer().schedule(new TimerTask() {
+						public void run() {
+							step();
+						}
+					}, delay - time + lastSubmit);
+				}
+			}
+		}
+		if (nbs != null) {
+			nbs.nextQueue();
+		}
+	}
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java	2007-09-10 15:46:32 UTC (rev 1737)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java	2007-09-13 21:18:23 UTC (rev 1738)
@@ -12,11 +12,16 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.globus.cog.karajan.util.BoundContact;
+
 public class HostSubmitQueue extends AbstractSubmitQueue {
 	private Map providerQueues;
+	private BoundContact contact;
+	private FixedRateQueue rlq;
 
-	public HostSubmitQueue(int throttle) {
+	public HostSubmitQueue(BoundContact contact, int throttle) {
 		super(throttle);
+		this.contact = contact;
 	}
 
 	private static final NullQueue NULL_QUEUE = new NullQueue();
@@ -33,8 +38,13 @@
 				sq = new RateLimiterQueue(initialRate, maxRetries, errorRegexp);
 				providerQueues.put(provider, sq);
 			}
-
 		}
+		else if (contact.getProperty("maxSubmitRate") != null) {
+			if (rlq == null) {
+				rlq = new FixedRateQueue(Double.parseDouble(contact.getProperty("maxSubmitRate").toString()));
+			}
+			sq = rlq;
+		}
 		else {
 			sq = NULL_QUEUE;
 		}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/InstanceSubmitQueue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/InstanceSubmitQueue.java	2007-09-10 15:46:32 UTC (rev 1737)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/InstanceSubmitQueue.java	2007-09-13 21:18:23 UTC (rev 1738)
@@ -27,7 +27,7 @@
 		synchronized (queues) {
 			HostSubmitQueue hq = (HostSubmitQueue) queues.get(contact);
 			if (hq == null) {
-				hq = new HostSubmitQueue(hostThrottle);
+				hq = new HostSubmitQueue(contact, hostThrottle);
 				queues.put(contact, hq);
 			}
 			return hq;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 | 
| 
     
      
      
      From: <ha...@us...> - 2008-02-12 17:36:59
       
   | 
Revision: 1898
          http://cogkit.svn.sourceforge.net/cogkit/?rev=1898&view=rev
Author:   hategan
Date:     2008-02-12 09:36:49 -0800 (Tue, 12 Feb 2008)
Log Message:
-----------
some logging updates
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/RateLimiterQueue.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java	2008-02-12 17:36:02 UTC (rev 1897)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/HostSubmitQueue.java	2008-02-12 17:36:49 UTC (rev 1898)
@@ -35,7 +35,7 @@
 		if ("ssh".equalsIgnoreCase(provider)) {
 			sq = (SubmitQueue) providerQueues.get(provider.toLowerCase());
 			if (sq == null) {
-				sq = new RateLimiterQueue(initialRate, maxRetries, errorRegexp);
+				sq = new RateLimiterQueue(initialRate, maxRetries, errorRegexp, contact);
 				providerQueues.put(provider, sq);
 			}
 		}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/RateLimiterQueue.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/RateLimiterQueue.java	2008-02-12 17:36:02 UTC (rev 1897)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/submitQueue/RateLimiterQueue.java	2008-02-12 17:36:49 UTC (rev 1898)
@@ -12,23 +12,30 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
+import org.apache.log4j.Logger;
 import org.globus.cog.abstraction.impl.common.StatusEvent;
 import org.globus.cog.abstraction.interfaces.Status;
 import org.globus.cog.abstraction.interfaces.StatusListener;
+import org.globus.cog.karajan.util.BoundContact;
 
 public class RateLimiterQueue extends AbstractSubmitQueue implements StatusListener {
+	public static final Logger logger = Logger.getLogger(RateLimiterQueue.class);
+
 	public static final int DEFAULT_MAX_RETRIES = 2;
 
 	private long lastSubmit, delay;
 	private static Timer timer;
 	private String errorRegexp;
 	private int maxRetries = DEFAULT_MAX_RETRIES;
+	private BoundContact contact;
 
-	public RateLimiterQueue(int initialRate, int maxRetries, String errorRegexp) {
+	public RateLimiterQueue(int initialRate, int maxRetries, String errorRegexp,
+			BoundContact contact) {
 		super(initialRate);
 		setRate(initialRate);
 		this.errorRegexp = errorRegexp;
 		this.maxRetries = maxRetries;
+		this.contact = contact;
 	}
 
 	public void setRate(int rate) {
@@ -96,7 +103,9 @@
 			if (getThrottle() > 2) {
 				setRate(getThrottle() - 1);
 			}
-			System.err.println("New rate: " + getThrottle() + " S/s");
+			if (logger.isDebugEnabled()) {
+				logger.debug("New rate for \"" + contact + "\": " + getThrottle() + " S/s");
+			}
 		}
 	}
 }
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 |