|
From: <ha...@us...> - 2007-01-06 20:55:02
|
Revision: 1534
http://svn.sourceforge.net/cogkit/?rev=1534&view=rev
Author: hategan
Date: 2007-01-06 12:55:00 -0800 (Sat, 06 Jan 2007)
Log Message:
-----------
use new queue; better (?) performance; fixed (?) out of order status events with tasks warning
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-01-06 20:53:17 UTC (rev 1533)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/LateBindingScheduler.java 2007-01-06 20:55:00 UTC (rev 1534)
@@ -36,6 +36,7 @@
import org.globus.cog.karajan.scheduler.submitQueue.SubmitQueue;
import org.globus.cog.karajan.util.BoundContact;
import org.globus.cog.karajan.util.Contact;
+import org.globus.cog.karajan.util.Queue;
import org.globus.cog.karajan.util.TaskHandlerWrapper;
import org.globus.cog.karajan.util.TypeUtil;
import org.globus.cog.karajan.util.VirtualContact;
@@ -66,6 +67,8 @@
private InstanceSubmitQueue submitQueue;
+ private boolean tasksFinished;
+
public LateBindingScheduler() {
virtualContacts = new HashMap();
executionHandlers = new HashMap();
@@ -94,8 +97,9 @@
return allocateContact(null);
}
- public void releaseContact(BoundContact contact) {
+ public synchronized void releaseContact(BoundContact contact) {
virtualContacts.remove(contact.getHost());
+ tasksFinished = true;
}
public synchronized BoundContact resolveVirtualContact(Task t, Contact contact)
@@ -153,13 +157,13 @@
protected TaskConstraints getTaskConstraints(Task t) {
Object constraints = super.getConstraints(t);
- if (constraints instanceof Contact[]) {
- Contact[] c = (Contact[]) constraints;
- if (c.length > 0 && c[0] != null) {
- return c[0].getConstraints();
- }
- }
- return null;
+ if (constraints instanceof Contact[]) {
+ Contact[] c = (Contact[]) constraints;
+ if (c.length > 0 && c[0] != null) {
+ return c[0].getConstraints();
+ }
+ }
+ return null;
}
public void enqueue(Task task, Object constraints) {
@@ -215,34 +219,29 @@
}
public void run() {
+ Queue queue = getJobQueue();
+ Queue.Cursor c = queue.cursor();
while (!isDone()) {
- synchronized (this) {
- while (getJobQueue().isEmpty() || (running >= getMaxSimultaneousJobs())) {
+ while (queue.isEmpty() || (running >= getMaxSimultaneousJobs())) {
+ synchronized (this) {
if (!sleep()) {
return;
}
}
}
- boolean success = false;
- int index = 0;
- while (!success) {
- if (index >= getJobQueue().size()) {
- synchronized (this) {
- if (!sleep()) {
- return;
- }
- }
- break;
- }
- Task t = (Task) getJobQueue().get(index);
+ synchronized (this) {
+ tasksFinished = false;
+ c.reset();
+ }
+ while (c.hasNext()) {
+ Task t = (Task) c.next();
boolean remove = true;
try {
submitUnbound(t);
- success = true;
}
catch (NoSuchResourceException e) {
failTask(t, "Could not find any valid host for task \"" + t
- + "\" with constraints " + getTaskConstraints(t), e);
+ + "\" with constraints " + getTaskConstraints(t), e);
}
catch (NoFreeResourceException e) {
remove = false;
@@ -251,18 +250,28 @@
failTask(t, "The scheduler could not execute the task", e);
}
if (remove) {
- synchronized (this) {
- getJobQueue().remove(index);
+ c.remove();
+ }
+ else {
+ while (!c.hasNext()) {
+ synchronized (this) {
+ if (!sleep()) {
+ return;
+ }
+ if (tasksFinished) {
+ tasksFinished = false;
+ c.reset();
+ }
+ }
}
}
- index++;
}
}
}
private boolean sleep() {
try {
- wait(250);
+ wait(2000);
return true;
}
catch (InterruptedException e) {
@@ -346,8 +355,8 @@
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
- logger.debug("Scheduler exception: job =" + t.getIdentity().getValue() + ", status = "
- + t.getStatus(), e);
+ logger.debug("Scheduler exception: job =" + t.getIdentity().getValue()
+ + ", status = " + t.getStatus(), e);
}
failTask(t, e.toString(), e);
return;
@@ -521,7 +530,8 @@
Task task = (Task) e.getSource();
Status status = e.getStatus();
int code = status.getStatusCode();
- if (!taskContacts.containsKey(e.getSource())) {
+ Contact[] contacts = (Contact[]) taskContacts.get(task);
+ if (contacts == null) {
return;
}
if (code == Status.COMPLETED) {
@@ -537,12 +547,12 @@
}
if (status.isTerminal()) {
synchronized (this) {
+ tasksFinished = true;
decRunning();
task.removeStatusListener(this);
if (task.getType() == Task.FILE_TRANSFER) {
currentTransfers--;
}
- Contact[] contacts = (Contact[]) taskContacts.get(task);
synchronized (taskContacts) {
taskContacts.remove(task);
}
@@ -553,14 +563,27 @@
c.setActiveTasks(c.getActiveTasks() - 1);
}
+ TaskHandler handler = getHandler(task);
try {
- TaskHandler handler = getHandler(task);
handler.remove(task);
}
catch (ActiveTaskException e1) {
- e1.printStackTrace();
- Throwable t = new RuntimeException("Something is wrong here", e1);
- t.printStackTrace();
+ /*
+ * I think this is the out of order status events
+ * phenomenon, where a task gets in an ACTIVE state
+ * after being COMPLETED. The good news is that it
+ * should only once get into the state of ACTIVE
+ */
+ task.getStatus().setStatusCode(code);
+ try {
+ handler.remove(task);
+ }
+ catch (ActiveTaskException e2) {
+ // now it's really weird
+ e1.printStackTrace();
+ Throwable t = new RuntimeException("Something is wrong here", e1);
+ t.printStackTrace();
+ }
}
finally {
removeHandler(task);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|