| 
      
      
      From: <ha...@us...> - 2007-02-06 00:56:31
      
     | 
| Revision: 1563
          http://svn.sourceforge.net/cogkit/?rev=1563&view=rev
Author:   hategan
Date:     2007-02-05 16:56:30 -0800 (Mon, 05 Feb 2007)
Log Message:
-----------
added fileop throtting
Modified Paths:
--------------
    trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-02-06 00:55:58 UTC (rev 1562)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java	2007-02-06 00:56:30 UTC (rev 1563)
@@ -42,12 +42,19 @@
 import org.globus.cog.karajan.util.VirtualContact;
 
 public abstract class LateBindingScheduler extends AbstractScheduler implements StatusListener {
+	public static final String JOBS_PER_CPU = "jobsPerCPU";
+	public static final String HOST_SUBMIT_THROTTLE = "hostSubmitThrottle";
+	public static final String SUBMIT_THROTTLE = "submitThrottle";
+	public static final String MAX_TRANSFERS = "maxTransfers";
+	public static final String SSH_INITIAL_RATE = "sshInitialRate";
+	public static final String MAX_FILE_OPERATIONS = "maxFileOperations";
 
 	public static final int K = 1024;
 	public static final int THREAD_STACK_SIZE = 192 * K;
 	public static final int DEFAULT_SSH_INITIAL_RATE = 6;
 	public static final int DEFAULT_JOBS_PER_CPU = 128;
 	public static final int DEFAULT_MAX_TRANSFERS = 32;
+	public static final int DEFAULT_MAX_FILE_OPERATIONS = 64;
 
 	private static final Logger logger = Logger.getLogger(LateBindingScheduler.class);
 
@@ -63,7 +70,8 @@
 
 	private final Map handlers, taskContacts;
 
-	private int jobsPerCPU, maxTransfers, currentTransfers, sshInitialRate;
+	private int jobsPerCPU, maxTransfers, currentTransfers, sshInitialRate, maxFileOperations,
+			currentFileOperations;
 
 	private InstanceSubmitQueue submitQueue;
 
@@ -75,6 +83,7 @@
 		taskContacts = new HashMap();
 		jobsPerCPU = DEFAULT_JOBS_PER_CPU;
 		maxTransfers = DEFAULT_MAX_TRANSFERS;
+		maxFileOperations = DEFAULT_MAX_FILE_OPERATIONS;
 		sshInitialRate = DEFAULT_SSH_INITIAL_RATE;
 		handlers = new HashMap();
 		submitQueue = new InstanceSubmitQueue();
@@ -84,7 +93,7 @@
 
 	public Contact allocateContact(Object constraints) throws NoFreeResourceException {
 		if (getResources().size() == 0) {
-			throw new NoFreeResourceException("No service contacts available");
+			throw new NoSuchResourceException("No service contacts available");
 		}
 		Contact contact = new VirtualContact();
 		if (constraints instanceof TaskConstraints) {
@@ -200,6 +209,9 @@
 		if (t.getType() == Task.FILE_TRANSFER && (currentTransfers >= maxTransfers)) {
 			throw new NoFreeResourceException();
 		}
+		else if (t.getType() == Task.FILE_OPERATION && (currentFileOperations >= maxFileOperations)) {
+			throw new NoFreeResourceException();
+		}
 	}
 
 	// make sure there is enough memory to run 8 more threads (approx).
@@ -480,6 +492,9 @@
 			if (t.getType() == Task.FILE_TRANSFER) {
 				currentTransfers++;
 			}
+			else if (t.getType() == Task.FILE_OPERATION) {
+				currentFileOperations++;
+			}
 		}
 	}
 
@@ -504,22 +519,25 @@
 	}
 
 	public void setProperty(String name, Object value) {
-		if (name.equalsIgnoreCase("jobsPerCpu")) {
+		if (name.equalsIgnoreCase(JOBS_PER_CPU)) {
 			logger.debug("Scheduler: setting jobsPerCpu to " + value);
 			jobsPerCPU = TypeUtil.toInt(value);
 		}
-		else if (name.equalsIgnoreCase("submitThrottle")) {
+		else if (name.equalsIgnoreCase(SUBMIT_THROTTLE)) {
 			submitQueue.setThrottle(TypeUtil.toInt(value));
 		}
-		else if (name.equalsIgnoreCase("hostSubmitThrottle")) {
+		else if (name.equalsIgnoreCase(HOST_SUBMIT_THROTTLE)) {
 			submitQueue.setHostThrottle(TypeUtil.toInt(value));
 		}
-		else if (name.equalsIgnoreCase("maxTransfers")) {
+		else if (name.equalsIgnoreCase(MAX_TRANSFERS)) {
 			maxTransfers = TypeUtil.toInt(value);
 		}
-		else if (name.equalsIgnoreCase("sshInitialRate")) {
+		else if (name.equalsIgnoreCase(SSH_INITIAL_RATE)) {
 			sshInitialRate = TypeUtil.toInt(value);
 		}
+		else if (name.equalsIgnoreCase(MAX_FILE_OPERATIONS)) {
+			maxFileOperations = TypeUtil.toInt(value);
+		}
 		else {
 			super.setProperty(name, value);
 		}
@@ -553,6 +571,9 @@
 					if (task.getType() == Task.FILE_TRANSFER) {
 						currentTransfers--;
 					}
+					else if (task.getType() == Task.FILE_OPERATION) {
+						currentFileOperations--;
+					}
 					synchronized (taskContacts) {
 						taskContacts.remove(task);
 					}
@@ -632,8 +653,8 @@
 	public synchronized String[] getPropertyNames() {
 		if (propertyNames == null) {
 			propertyNames = AbstractScheduler.combineNames(super.getPropertyNames(), new String[] {
-					"jobsPerCPU", "hostSubmitThrottle", "submitThrottle", "maxTransfers",
-					"sshInitialRate" });
+					JOBS_PER_CPU, HOST_SUBMIT_THROTTLE, SUBMIT_THROTTLE, MAX_TRANSFERS,
+					SSH_INITIAL_RATE, MAX_FILE_OPERATIONS });
 		}
 		return propertyNames;
 	}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 |