You can subscribe to this list here.
2006 |
Jan
|
Feb
|
Mar
|
Apr
(39) |
May
(165) |
Jun
(164) |
Jul
(127) |
Aug
(81) |
Sep
(146) |
Oct
(375) |
Nov
(241) |
Dec
(77) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2007 |
Jan
(42) |
Feb
(38) |
Mar
(30) |
Apr
(6) |
May
(17) |
Jun
|
Jul
(15) |
Aug
(59) |
Sep
(31) |
Oct
(44) |
Nov
(30) |
Dec
(12) |
2008 |
Jan
(9) |
Feb
(63) |
Mar
(18) |
Apr
(43) |
May
(28) |
Jun
(32) |
Jul
(61) |
Aug
(5) |
Sep
(72) |
Oct
(48) |
Nov
(6) |
Dec
|
From: <ha...@us...> - 2008-09-23 04:42:44
|
Revision: 2186 http://cogkit.svn.sourceforge.net/cogkit/?rev=2186&view=rev Author: hategan Date: 2008-09-23 04:42:38 +0000 (Tue, 23 Sep 2008) Log Message: ----------- log node before trying to connect (D'oh\!);some other minor changes Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl =================================================================== --- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-23 04:39:59 UTC (rev 2185) +++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-23 04:42:38 UTC (rev 2186) @@ -38,7 +38,7 @@ my %HANDLERS = ( - "SHUTDOWN" => \&shutdown, + "SHUTDOWN" => \&shutdownw, "SUBMITJOB" => \&submitjob, "REGISTER" => \®ister, "HEARTBEAT" => \&heartbeat, @@ -85,7 +85,7 @@ last; } else { - wlog "Connection failed\n"; + wlog "Connection failed: $!\n"; select(undef, undef, undef, 2 ** $i); } } @@ -263,7 +263,7 @@ $SOCK->recv($data, 12); if (length($data) > 0) { wlog "Received $data\n"; - eval { process(unpackData($data)); } || wlog "$@\n"; + eval { process(unpackData($data)); } || wlog "Failed to process data: $@\n"; } else { #sleep 250ms @@ -351,11 +351,11 @@ } -sub shutdown { +sub shutdownw { my ($tag, $timeout, $msgs) = @_; - + wlog "Shutdown command received\n"; sendReply($tag, ("OK")); - wlog "Shutdown command received. Exiting\n"; + wlog "Acknowledged shutdown. Exiting\n"; exit 0; } @@ -401,11 +401,11 @@ my $tag = shift; my $executable = $JOB{"executable"}; if (!(defined $JOBID)) { - sendReply($tag, ("Missing job identity")); + sendError($tag, ("Missing job identity")); return 0; } elsif (!(defined $executable)) { - sendReply($tag, ("Missing executable")); + sendError($tag, ("Missing executable")); return 0; } else { @@ -492,10 +492,11 @@ my $myhost=`hostname`; $myhost =~ s/\s+$//; - init(); - + wlog("Initialized coaster worker $i\n"); wlog("Running on node $myhost\n"); + + init(); mainloop(); exit(0); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-23 04:40:05
|
Revision: 2185 http://cogkit.svn.sourceforge.net/cogkit/?rev=2185&view=rev Author: hategan Date: 2008-09-23 04:39:59 +0000 (Tue, 23 Sep 2008) Log Message: ----------- plenty'o'stuff Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerKey.java trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java Added Paths: ----------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Seconds.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java 2008-09-23 04:38:57 UTC (rev 2184) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/CoasterQueueProcessor.java 2008-09-23 04:39:59 UTC (rev 2185) @@ -11,12 +11,16 @@ import java.io.IOException; +import org.apache.log4j.Logger; import org.globus.cog.abstraction.impl.common.StatusImpl; import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl; import org.globus.cog.abstraction.interfaces.Status; import org.globus.cog.abstraction.interfaces.TaskHandler; +import org.globus.cog.karajan.util.Queue; public class CoasterQueueProcessor extends QueueProcessor { + public static final Logger logger = Logger + .getLogger(CoasterQueueProcessor.class); private TaskHandler taskHandler; private WorkerManager workerManager; private String workdir; @@ -30,6 +34,25 @@ public void run() { try { + new Thread() { + { + setDaemon(true); + } + + public void run() { + while (true) { + try { + Thread.sleep(20000); + Queue q = getQueue(); + logger.info("Coaster queue: " + q); + } + catch (Exception e) { + e.printStackTrace(); + } + } + } + }.start(); + workerManager.start(); AssociatedTask at; while (!this.getShutdownFlag()) { @@ -54,7 +77,7 @@ } } if (hasWrapped()) { - synchronized(workerManager) { + synchronized (workerManager) { workerManager.wait(1000); } } Added: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Seconds.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Seconds.java (rev 0) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Seconds.java 2008-09-23 04:39:59 UTC (rev 2185) @@ -0,0 +1,65 @@ +//---------------------------------------------------------------------- +//This code is developed as part of the Java CoG Kit project +//The terms of the license can be found at http://www.cogkit.org/license +//This message may not be removed or altered. +//---------------------------------------------------------------------- + +/* + * Created on Sep 22, 2008 + */ +package org.globus.cog.abstraction.coaster.service.job.manager; + + +public final class Seconds { + private long seconds; + public static final Seconds NEVER = new Seconds(Long.MAX_VALUE); + + public Seconds(long seconds) { + this.seconds = seconds; + } + + public static Seconds now() { + return new Seconds(System.currentTimeMillis() / 1000); + } + + public long getSeconds() { + return seconds; + } + + public Seconds add(Seconds s) { + return new Seconds(seconds + s.seconds); + } + + public Seconds multiply(int factor) { + return new Seconds(seconds * factor); + } + + public Seconds subtract(Seconds s) { + return new Seconds(seconds - s.seconds) ; + } + + public Seconds divide(int d) { + return new Seconds(seconds / d); + } + + public long toMilliseconds() { + return seconds * 1000; + } + + public String toString() { + return seconds + "s"; + } + + public boolean equals(Object obj) { + if (obj instanceof Seconds) { + return seconds == ((Seconds) obj).seconds; + } + else { + return false; + } + } + + public int hashCode() { + return (int) seconds; + } +} Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-09-23 04:38:57 UTC (rev 2184) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/Worker.java 2008-09-23 04:39:59 UTC (rev 2185) @@ -9,6 +9,9 @@ */ package org.globus.cog.abstraction.coaster.service.job.manager; +import java.util.Timer; +import java.util.TimerTask; + import org.apache.log4j.Logger; import org.globus.cog.abstraction.impl.common.StatusEvent; import org.globus.cog.abstraction.impl.common.StatusImpl; @@ -21,33 +24,40 @@ import org.globus.cog.karajan.workflow.service.commands.ShutdownCommand; public class Worker implements StatusListener { - public static final Logger logger = Logger.getLogger(Worker.class); - + public static final Logger logger = Logger.getLogger(Worker.class); + + private static final Timer timer; + + static { + timer = new Timer(); + } + + private static final Seconds SHUTDOWN_RESERVE = new Seconds(10); + private Task task, running; private String id; private WorkerManager manager; private boolean starting; - private Long scheduledTerminationTime; - private int maxWallTime; + private Seconds scheduledTerminationTime; + private Seconds maxWallTime; private Status error; private ChannelContext channelContext; - private static final Long NEVER = new Long(Long.MAX_VALUE); - - public Worker(WorkerManager manager, String id, int maxWallTime, Task w, - Task p) { + public Worker(WorkerManager manager, String id, Seconds maxWallTime, + Task w, Task p) { this.manager = manager; this.id = id; this.maxWallTime = maxWallTime; this.task = w; this.running = p; this.starting = true; - this.scheduledTerminationTime = NEVER; + this.scheduledTerminationTime = Seconds.NEVER; w.addStatusListener(this); } public void statusChanged(StatusEvent event) { Status s = event.getStatus(); + logger.warn("Worker " + id + " status change: " + s); int code = s.getStatusCode(); Task src = (Task) event.getSource(); if (code == Status.FAILED) { @@ -72,12 +82,12 @@ manager.workerTerminated(this); } } - + public Task getWorkerTask() { return task; } - public Task getRunning() { + public synchronized Task getRunning() { return running; } @@ -96,20 +106,36 @@ return id; } - public Long getScheduledTerminationTime() { + public Seconds getScheduledTerminationTime() { return scheduledTerminationTime; } - public int getMaxWallTime() { + public Seconds getMaxWallTime() { return maxWallTime; } - public void setScheduledTerminationTime(Long l) { - this.scheduledTerminationTime = l; + public void setScheduledTerminationTime(Seconds s) { + this.scheduledTerminationTime = s; + shutdownAfter(s.add(SHUTDOWN_RESERVE).subtract( + WorkerManager.TIME_RESERVE).toMilliseconds() + - System.currentTimeMillis()); } - public void setScheduledTerminationTime(long l) { - this.scheduledTerminationTime = new Long(l); + private void shutdownAfter(long ms) { + timer.schedule(new TimerTask() { + public void run() { + if (running == null) { + shutdown(); + } + else { + // what this really means is that a walltime spec was wrong + // and that the queuing system will likely kill the worker + // anyway + logger + .info("Worker still has a running task. Shutdown canceled."); + } + } + }, ms); } public Status getStatus() { @@ -130,13 +156,39 @@ public void shutdown() { try { + logger.info("Shutting down worker: " + this); KarajanChannel channel = ChannelManager.getManager() .reserveChannel(channelContext); ShutdownCommand sc = new ShutdownCommand(); sc.execute(channel); + logger.info("Worker shut down: " + this); } catch (Exception e) { - logger.warn("Failed to shut down worker", e); + logger + .warn( + "Failed to shut down worker nicely. Trying to cancel task.", + e); + try { + manager.getTaskHandler().cancel(task); + logger.info("Worker task canceled"); + } + catch (Exception ee) { + logger.info("Failed to cancel worker task", ee); + } } } + + public boolean isPastScheduledTerminationTime() { + return scheduledTerminationTime.toMilliseconds() + - System.currentTimeMillis() < 0; + } + + public synchronized void setRunning(Task task) { + this.running = task; + if (task == null && isPastScheduledTerminationTime()) { + // that's to avoid running the shutdown in some strange thread it + // shouldn't be running in + shutdownAfter(1); + } + } } Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerKey.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerKey.java 2008-09-23 04:38:57 UTC (rev 2184) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerKey.java 2008-09-23 04:39:59 UTC (rev 2185) @@ -11,13 +11,13 @@ public class WorkerKey implements Comparable { private Worker worker; - private long time; - + private Seconds time; + public WorkerKey(Worker worker) { this.worker = worker; } - - public WorkerKey(long time) { + + public WorkerKey(Seconds time) { this.time = time; } @@ -26,27 +26,26 @@ if (worker == null) { if (wk.worker == null) { - return sgn(time - wk.time); + return sgn(time.subtract(wk.time)); } else { - return sgn(time - - wk.worker.getScheduledTerminationTime().longValue()); + return sgn(time.subtract(wk.worker + .getScheduledTerminationTime())); } } else { if (wk.worker == null) { - return sgn(worker.getScheduledTerminationTime().longValue() - - wk.time); + return sgn(worker.getScheduledTerminationTime().subtract( + wk.time)); } else { if (worker == wk.worker) { return 0; } else { - int dif = sgn(worker.getScheduledTerminationTime() - .longValue() - - wk.worker.getScheduledTerminationTime() - .longValue()); + int dif = sgn(worker + .getScheduledTerminationTime() + .subtract(wk.worker.getScheduledTerminationTime())); if (dif != 0) { return dif; } @@ -59,7 +58,8 @@ } } - private int sgn(long val) { + private int sgn(Seconds s) { + long val = s.getSeconds(); if (val < 0) { return -1; } @@ -71,4 +71,13 @@ } } + public String toString() { + if (worker == null) { + return "/" + time; + } + else { + return worker.getId() + "/" + + worker.getScheduledTerminationTime(); + } + } } Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-09-23 04:38:57 UTC (rev 2184) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-09-23 04:39:59 UTC (rev 2185) @@ -54,7 +54,7 @@ * We allow for at least one minute of extra time compared to the requested * walltime */ - public static final int TIME_RESERVE = 60; + public static final Seconds TIME_RESERVE = new Seconds(60); public static final File scriptDir = new File(System .getProperty("user.home") @@ -126,35 +126,9 @@ public void run() { try { - new Thread() { - { - setDaemon(true); - } - - public void run() { - while (true) { - try { - Thread.sleep(20000); - synchronized (WorkerManager.this) { - logger.info("Current workers: " - + currentWorkers); - logger.info("Ready: " + ready); - logger.info("Busy: " + busy); - logger.info("Requested: " + requested); - logger.info("Starting: " + startingTasks); - logger.info("Ids: " + ids); - } - synchronized (allocationRequests) { - logger.info("AllocationR: " - + allocationRequests); - } - } - catch (Exception e) { - e.printStackTrace(); - } - } - } - }.start(); + if (logger.isInfoEnabled()) { + startInfoThread(); + } AllocationRequest req; while (!shutdownFlag) { synchronized (allocationRequests) { @@ -168,9 +142,9 @@ } } try { - startWorker(req.maxWallTime.getSeconds() - * OVERALLOCATION_FACTOR + TIME_RESERVE, - req.prototype); + startWorker(new Seconds(req.maxWallTime.getSeconds()) + .multiply(OVERALLOCATION_FACTOR) + .add(TIME_RESERVE), req.prototype); } catch (NoClassDefFoundError e) { req.prototype.setStatus(new StatusImpl(Status.FAILED, e @@ -191,7 +165,7 @@ } } - private void startWorker(int maxWallTime, Task prototype) + private void startWorker(Seconds maxWallTime, Task prototype) throws InvalidServiceContactException, InvalidProviderException, ProviderMethodException { String numWorkersString = (String) ((JobSpecification) prototype @@ -289,7 +263,7 @@ return s; } - private void copyAttributes(Task t, Task prototype, int maxWallTime) { + private void copyAttributes(Task t, Task prototype, Seconds maxWallTime) { JobSpecification pspec = (JobSpecification) prototype .getSpecification(); JobSpecification tspec = (JobSpecification) t.getSpecification(); @@ -300,7 +274,8 @@ tspec.setAttribute(name, pspec.getAttribute(name)); } } - tspec.setAttribute("maxwalltime", new WallTime(maxWallTime).format()); + tspec.setAttribute("maxwalltime", new WallTime((int) maxWallTime + .getSeconds()).format()); } private int k; @@ -308,9 +283,13 @@ public Worker request(WallTime maxWallTime, Task prototype) throws InterruptedException { - WorkerKey key = new WorkerKey(maxWallTime.getSeconds() + TIME_RESERVE - + now()); + WorkerKey key = new WorkerKey(new Seconds(maxWallTime.getSeconds()) + .add(TIME_RESERVE).add(Seconds.now())); Worker w = null; + if (logger.isDebugEnabled()) { + logger.debug("Looking for worker for key " + key); + logger.debug("Ready: " + ready); + } synchronized (this) { Collection tm = ready.tailMap(key).values(); Iterator i = tm.iterator(); @@ -324,7 +303,6 @@ } if (w != null) { - System.err.print("."); if (k == 0) { last = System.currentTimeMillis(); } @@ -338,6 +316,7 @@ System.err.println(" " + k / 80 + "; " + js + " J/s"); } logger.info("Using worker " + w + " for task " + prototype); + w.setRunning(prototype); return w; } else { @@ -376,10 +355,6 @@ } } - private long now() { - return System.currentTimeMillis() / 1000; - } - public void workerTerminated(Worker worker) { logger.warn("Worker terminated: " + worker); Status s = worker.getStatus(); @@ -387,7 +362,8 @@ synchronized (this) { requested.remove(worker.getId()); startingTasks.remove(worker.getRunning()); - //this will cause all the jobs associated with the worker to fail + // this will cause all the jobs associated with the worker to + // fail ready.put(new WorkerKey(worker), worker); } } @@ -410,17 +386,17 @@ + "). This worker manager instance does not " + "recall requesting a worker with such an id."); } - wr.workerRegistered(); - wr.setScheduledTerminationTime(now() + wr.getMaxWallTime() - - TIME_RESERVE); + wr.setScheduledTerminationTime(Seconds.now().add(wr.getMaxWallTime())); wr.setChannelContext(cc); + if (logger.isInfoEnabled()) { + logger.info("Worker registration received: " + wr); + } synchronized (this) { startingTasks.remove(wr.getRunning()); ready.put(new WorkerKey(wr), wr); ids.put(id, wr); + wr.workerRegistered(); } - System.err.println("RR. ready: " + ready.size() + ", busy: " - + busy.size()); } public void removeWorker(Worker worker) { @@ -438,6 +414,7 @@ busy.remove(wr); ready.put(new WorkerKey(wr), wr); notifyAll(); + wr.setRunning(null); } } @@ -457,6 +434,10 @@ } } + protected TaskHandler getTaskHandler() { + return handler; + } + private static class AllocationRequest { public WallTime maxWallTime; public Task prototype; @@ -488,4 +469,35 @@ } } } + + private void startInfoThread() { + new Thread() { + { + setDaemon(true); + } + + public void run() { + while (true) { + try { + Thread.sleep(20000); + + synchronized (WorkerManager.this) { + logger.info("Current workers: " + currentWorkers); + logger.info("Ready: " + ready); + logger.info("Busy: " + busy); + logger.info("Requested: " + requested); + logger.info("Starting: " + startingTasks); + logger.info("Ids: " + ids); + } + synchronized (allocationRequests) { + logger.info("AllocationR: " + allocationRequests); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + } + }.start(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-23 04:39:02
|
Revision: 2184 http://cogkit.svn.sourceforge.net/cogkit/?rev=2184&view=rev Author: hategan Date: 2008-09-23 04:38:57 +0000 (Tue, 23 Sep 2008) Log Message: ----------- merged forgotten commit Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java 2008-09-23 04:36:32 UTC (rev 2183) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java 2008-09-23 04:38:57 UTC (rev 2184) @@ -9,11 +9,10 @@ */ package org.globus.cog.abstraction.coaster.service.job.manager; +import org.globus.cog.abstraction.impl.common.execution.WallTime; import org.globus.cog.abstraction.interfaces.JobSpecification; import org.globus.cog.abstraction.interfaces.Task; -import org.globus.cog.abstraction.impl.common.execution.WallTime; - public class AssociatedTask { public final Task task; public final WallTime maxWallTime; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-23 04:36:38
|
Revision: 2183 http://cogkit.svn.sourceforge.net/cogkit/?rev=2183&view=rev Author: hategan Date: 2008-09-23 04:36:32 +0000 (Tue, 23 Sep 2008) Log Message: ----------- added shutdown watchdog Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-09-23 00:36:41 UTC (rev 2182) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-09-23 04:36:32 UTC (rev 2183) @@ -22,7 +22,6 @@ import org.globus.cog.karajan.workflow.service.RemoteConfiguration; import org.globus.cog.karajan.workflow.service.RequestManager; import org.globus.cog.karajan.workflow.service.ServiceRequestManager; -import org.globus.cog.karajan.workflow.service.channels.ChannelException; import org.globus.cog.karajan.workflow.service.channels.ChannelManager; import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; import org.globus.gsi.gssapi.auth.SelfAuthorization; @@ -172,12 +171,35 @@ } public void shutdown() { + startShutdownWatchdog(); super.shutdown(); jobQueue.getWorkerManager().shutdown(); done = true; logger.info("Shutdown sequence completed"); } + private void startShutdownWatchdog() { + new Thread() { + { + setName("Shutdown watchdog"); + setDaemon(true); + } + + public void run() { + try { + Thread.sleep(5 * 60 * 1000); + logger + .info("Shutdown failed after 5 minutes. Forcefully shutting down"); + System.exit(3); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + + }.start(); + } + public JobQueue getJobQueue() { return jobQueue; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-23 00:36:46
|
Revision: 2182 http://cogkit.svn.sourceforge.net/cogkit/?rev=2182&view=rev Author: hategan Date: 2008-09-23 00:36:41 +0000 (Tue, 23 Sep 2008) Log Message: ----------- removing debugging statement Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java 2008-09-23 00:35:53 UTC (rev 2181) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java 2008-09-23 00:36:41 UTC (rev 2182) @@ -38,7 +38,6 @@ if (message != null && !message.equals("")) { s.setMessage(message); } - logger.error("Job " + jobId + " is " + status); NotificationManager.getDefault().notificationReceived(jobId, s); sendReply("OK"); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-23 00:36:00
|
Revision: 2181 http://cogkit.svn.sourceforge.net/cogkit/?rev=2181&view=rev Author: hategan Date: 2008-09-23 00:35:53 +0000 (Tue, 23 Sep 2008) Log Message: ----------- added some logging info Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-09-22 20:33:06 UTC (rev 2180) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-09-23 00:35:53 UTC (rev 2181) @@ -119,6 +119,7 @@ } private void stop(Exception e) { + jobQueue.getWorkerManager().shutdown(); synchronized (this) { this.e = e; done = true; @@ -174,6 +175,7 @@ super.shutdown(); jobQueue.getWorkerManager().shutdown(); done = true; + logger.info("Shutdown sequence completed"); } public JobQueue getJobQueue() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-22 20:33:10
|
Revision: 2180 http://cogkit.svn.sourceforge.net/cogkit/?rev=2180&view=rev Author: hategan Date: 2008-09-22 20:33:06 +0000 (Mon, 22 Sep 2008) Log Message: ----------- toString Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/Queue.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/Queue.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/Queue.java 2008-09-22 19:07:49 UTC (rev 2179) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/Queue.java 2008-09-22 20:33:06 UTC (rev 2180) @@ -51,6 +51,20 @@ public Cursor cursor() { return new C(); } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append('['); + Cursor c = cursor(); + while (c.hasNext()) { + sb.append(c.next()); + if (c.hasNext()) { + sb.append(", "); + } + } + sb.append(']'); + return sb.toString(); + } private class Entry { private final Object obj; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <b_...@us...> - 2008-09-22 19:08:03
|
Revision: 2179 http://cogkit.svn.sourceforge.net/cogkit/?rev=2179&view=rev Author: b_z_c Date: 2008-09-22 19:07:49 +0000 (Mon, 22 Sep 2008) Log Message: ----------- r2174 messed up walltime class path so that coasters do not compile Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java 2008-09-20 21:13:50 UTC (rev 2178) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/AssociatedTask.java 2008-09-22 19:07:49 UTC (rev 2179) @@ -12,6 +12,8 @@ import org.globus.cog.abstraction.interfaces.JobSpecification; import org.globus.cog.abstraction.interfaces.Task; +import org.globus.cog.abstraction.impl.common.execution.WallTime; + public class AssociatedTask { public final Task task; public final WallTime maxWallTime; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <li...@us...> - 2008-09-20 21:13:57
|
Revision: 2178 http://cogkit.svn.sourceforge.net/cogkit/?rev=2178&view=rev Author: liuwt Date: 2008-09-20 21:13:50 +0000 (Sat, 20 Sep 2008) Log Message: ----------- Modified Paths: -------------- trunk/current/src/cog/modules/transfer-gui/src/org/globus/transfer/reliable/client/credential/CredentialDialog.java Modified: trunk/current/src/cog/modules/transfer-gui/src/org/globus/transfer/reliable/client/credential/CredentialDialog.java =================================================================== --- trunk/current/src/cog/modules/transfer-gui/src/org/globus/transfer/reliable/client/credential/CredentialDialog.java 2008-09-19 20:28:59 UTC (rev 2177) +++ trunk/current/src/cog/modules/transfer-gui/src/org/globus/transfer/reliable/client/credential/CredentialDialog.java 2008-09-20 21:13:50 UTC (rev 2178) @@ -83,11 +83,14 @@ proxyInitFrame.setLocation(50, 50); proxyInitFrame.pack(); //UITools.center(mainPanel, proxyInitFrame); + this.setVisible(false); proxyInitFrame.setVisible(true); + } private void MyproxyActionPerformed(java.awt.event.ActionEvent evt) { MyProxyLogonGUI.main(null); + this.setVisible(false); } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:56:34
|
Revision: 2172 http://cogkit.svn.sourceforge.net/cogkit/?rev=2172&view=rev Author: hategan Date: 2008-09-16 17:56:30 +0000 (Tue, 16 Sep 2008) Log Message: ----------- ... Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RemoteConfiguration.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceRequestManager.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/ChannelConfigurationHandler.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -113,15 +113,6 @@ port = 1984; } channel = ChannelFactory.newChannel(contact, sc, requestManager); - URI callbackURI = null; - if (sc.getConfiguration().hasOption(RemoteConfiguration.CALLBACK)) { - callbackURI = channel.getCallbackURI(); - } - String remoteID = getChannel().getChannelContext().getChannelID().getRemoteID(); - - ChannelConfigurationCommand ccc = new ChannelConfigurationCommand( - sc.getConfiguration(), callbackURI); - ccc.execute(this.getChannel()); connected = true; } catch (Exception e) { Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -9,6 +9,7 @@ */ package org.globus.cog.karajan.workflow.service; +import java.io.IOException; import java.net.Socket; import org.apache.log4j.Logger; @@ -25,11 +26,11 @@ private final AbstractTCPChannel channel; private final RequestManager requestManager; - public ConnectionHandler(Service service, Socket socket) { + public ConnectionHandler(Service service, Socket socket) throws IOException { this(service, socket, null); } - public ConnectionHandler(Service service, Socket socket, RequestManager requestManager) { + public ConnectionHandler(Service service, Socket socket, RequestManager requestManager) throws IOException { this.socket = socket; this.requestManager = requestManager == null ? new ServiceRequestManager() : requestManager; if (socket instanceof GssSocket) { @@ -45,5 +46,5 @@ socket.setKeepAlive(true); socket.setSoTimeout(0); channel.start(); - } + } } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -22,6 +22,9 @@ import org.globus.cog.abstraction.interfaces.JobSpecification; import org.globus.cog.abstraction.interfaces.Task; import org.globus.cog.abstraction.interfaces.TaskHandler; +import org.globus.cog.karajan.workflow.service.channels.ChannelException; +import org.globus.cog.karajan.workflow.service.channels.ChannelManager; +import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; import org.globus.cog.util.ArgumentParser; import org.globus.cog.util.ArgumentParserException; import org.globus.cog.util.GridMap; @@ -41,7 +44,7 @@ private boolean restricted; private URI contact; private RequestManager requestManager; - + public GSSService() throws IOException { super(); } @@ -76,11 +79,11 @@ this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo); } } - + protected void setRequestManager(RequestManager requestManager) { this.requestManager = requestManager; } - + public void initialize() { // prevent the server from being started by BaseServer constructors } @@ -123,11 +126,12 @@ if (serverThread == null) { accept = true; serverThread = new Thread(this); + serverThread.setDaemon(true); serverThread.setName("Server: " + getContact()); serverThread.start(); } } - + public String toString() { return String.valueOf(contact); } @@ -270,4 +274,9 @@ System.exit(1); } } + + public void irrecoverableChannelError(KarajanChannel channel, Exception e) { + System.err.println("Irrecoverable channel exception: " + e.getMessage()); + System.exit(2); + } } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RemoteConfiguration.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RemoteConfiguration.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RemoteConfiguration.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -29,6 +29,7 @@ public static final String RECONNECT = "reconnect"; public static final String POLL = "poll"; public static final String BUFFER = "buffer"; + public static final String HEARTBEAT = "heartbeat"; private List entries; private static final Entry DEFAULT; @@ -75,9 +76,11 @@ } public Entry find(String host) { + logger.warn("Find: " + host); Iterator i = entries.iterator(); while (i.hasNext()) { Entry e = (Entry) i.next(); + logger.warn("Find: " + e.getUnparsed() + " - " + host); if (e.compiled.matcher(host).matches()) { return e; } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -191,6 +191,10 @@ public KarajanChannel getChannel() { return channel; } + + protected void setChannel(KarajanChannel channel) { + this.channel = channel; + } public List getOutData() { return outData; Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -11,6 +11,8 @@ import java.net.URI; +import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; + public interface Service { boolean isRestricted(); @@ -19,4 +21,5 @@ ServiceContext getContext(); + void irrecoverableChannelError(KarajanChannel channel, Exception e); } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceRequestManager.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceRequestManager.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceRequestManager.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -12,6 +12,7 @@ import org.globus.cog.karajan.workflow.service.handlers.ChannelConfigurationHandler; import org.globus.cog.karajan.workflow.service.handlers.EchoHandler; import org.globus.cog.karajan.workflow.service.handlers.EventHandler; +import org.globus.cog.karajan.workflow.service.handlers.HeartBeatHandler; import org.globus.cog.karajan.workflow.service.handlers.ShutdownHandler; import org.globus.cog.karajan.workflow.service.handlers.StartGroupHandler; import org.globus.cog.karajan.workflow.service.handlers.StartHandler; @@ -35,5 +36,6 @@ addHandler("SHUTDOWN", ShutdownHandler.class); addHandler("STAT", StatHandler.class); addHandler("SUBMIT", SubmitHandler.class); + addHandler(HeartBeatHandler.NAME, HeartBeatHandler.class); } } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -22,6 +22,7 @@ import org.globus.cog.karajan.workflow.service.channels.ChannelContext; import org.globus.cog.karajan.workflow.service.channels.ChannelException; import org.globus.cog.karajan.workflow.service.channels.ChannelManager; +import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; import org.globus.cog.karajan.workflow.service.channels.UDPChannel; public class UDPService implements Service, Runnable { @@ -122,6 +123,10 @@ } } + public void irrecoverableChannelError(KarajanChannel channel, Exception e) { + e.printStackTrace(); + } + public static void main(String[] args) { try { RequestManager rm = new ServiceRequestManager(); Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/ChannelConfigurationHandler.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/ChannelConfigurationHandler.java 2008-09-16 17:56:08 UTC (rev 2171) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/ChannelConfigurationHandler.java 2008-09-16 17:56:30 UTC (rev 2172) @@ -60,7 +60,7 @@ Iterator i = conf.getOptions().iterator(); while (i.hasNext()) { String opt = (String) i.next(); - if (opt.equals(RemoteConfiguration.POLL)) { + if (opt.equals(RemoteConfiguration.POLL) || opt.equals(RemoteConfiguration.RECONNECT)) { newopts.put(RemoteConfiguration.BUFFER, null); } else if (opt.equals(RemoteConfiguration.CALLBACK)) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:56:14
|
Revision: 2171 http://cogkit.svn.sourceforge.net/cogkit/?rev=2171&view=rev Author: hategan Date: 2008-09-16 17:56:08 +0000 (Tue, 16 Sep 2008) Log Message: ----------- ... Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -15,17 +15,21 @@ import java.net.URI; import java.util.LinkedList; import java.util.List; +import java.util.TimerTask; import org.apache.log4j.Logger; import org.globus.cog.karajan.workflow.service.NoSuchHandlerException; import org.globus.cog.karajan.workflow.service.ProtocolException; +import org.globus.cog.karajan.workflow.service.RemoteConfiguration; import org.globus.cog.karajan.workflow.service.RequestManager; import org.globus.cog.karajan.workflow.service.Service; +import org.globus.cog.karajan.workflow.service.RemoteConfiguration.Entry; import org.globus.cog.karajan.workflow.service.commands.Command; import org.globus.cog.karajan.workflow.service.handlers.RequestHandler; public abstract class AbstractKarajanChannel implements KarajanChannel { private static final Logger logger = Logger.getLogger(AbstractKarajanChannel.class); + public static final int DEFAULT_HEARTBEAT_INTERVAL = 5 * 60; //seconds private ChannelContext context; private volatile int usageCount, longTermUsageCount; @@ -34,8 +38,10 @@ private boolean localShutdown, closed; private String name; private Service callbackService; + private final boolean client; - protected AbstractKarajanChannel(RequestManager requestManager, ChannelContext channelContext) { + protected AbstractKarajanChannel(RequestManager requestManager, ChannelContext channelContext, + boolean client) { if (channelContext != null) { this.context = channelContext; } @@ -44,8 +50,47 @@ } this.requestManager = requestManager; registeredMaps = new LinkedList(); + this.client = client; + configureHeartBeat(); } + protected void configureHeartBeat() { + TimerTask heartBeatTask; + Entry config = context.getConfiguration(); + int heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL; + if (config != null && config.hasOption(RemoteConfiguration.HEARTBEAT)) { + if (config.hasArg(RemoteConfiguration.HEARTBEAT)) { + heartBeatInterval = Integer.parseInt(config.getArg(RemoteConfiguration.HEARTBEAT)); + } + heartBeatInterval *= 1000; + } + if (!isOffline() && isClient()) { + heartBeatTask = new HeartBeatTask(this); + context.getTimer().schedule(heartBeatTask, heartBeatInterval, heartBeatInterval); + } + else { + if (logger.isInfoEnabled()) { + if (config == null) { + logger.info(this + ": Disabling heartbeats (config is null)"); + } + else if (!config.hasOption(RemoteConfiguration.HEARTBEAT)) { + logger.info(this + ": Disabling heartbeats (disabled in config)"); + } + else if (isOffline()) { + logger.info(this + ": Disabling heartbeats (offline channel)"); + } + else if (!isClient()) { + logger.info(this + ": Disabling heartbeats (not a client)"); + } + } + } + if (!isOffline() && !isClient()) { + int mult = 2; + heartBeatTask = new HeartBeatCheckTask(this, heartBeatInterval, mult); + context.getTimer().schedule(heartBeatTask, mult * heartBeatInterval, mult * heartBeatInterval); + } + } + public void registerCommand(Command cmd) throws ProtocolException { context.registerCommand(cmd); cmd.register(this); @@ -106,7 +151,7 @@ } return crt + count; } - + public static void pack(byte[] buf, int offset, int value) { buf[offset] = (byte) (value & 0xff); buf[offset + 1] = (byte) ((value >> 8) & 0xff); @@ -141,7 +186,7 @@ public RequestManager getRequestManager() { return requestManager; } - + public void setRequestManager(RequestManager rm) { if (rm == null) { throw new IllegalArgumentException("The request manager cannot be null"); @@ -189,7 +234,7 @@ } public boolean isClient() { - return false; + return client; } public String getName() { @@ -223,14 +268,17 @@ protected void handleReply(int tag, boolean fin, boolean error, int len, byte[] data) { if (logger.isDebugEnabled()) { - logger.debug(this + " REPL<: tag = " + tag + ", fin = " + fin - + ", err = " + error + ", datalen = " + len + ", data = " + ppByteBuf(data)); + logger.debug(this + " REPL<: tag = " + tag + ", fin = " + fin + ", err = " + error + + ", datalen = " + len + ", data = " + ppByteBuf(data)); } Command cmd = getChannelContext().getRegisteredCommand(tag); if (cmd != null) { try { cmd.replyReceived(data); if (fin) { + if (logger.isInfoEnabled()) { + logger.info(this + " REPL: " + cmd); + } if (error) { cmd.errorReceived(); } @@ -249,15 +297,15 @@ unregisteredSender(tag); } } - + protected void unregisteredSender(int tag) { logger.warn(getName() + " Recieved reply to unregistered sender. Tag: " + tag); } protected void handleRequest(int tag, boolean fin, boolean error, int len, byte[] data) { if (logger.isDebugEnabled()) { - logger.debug(this + " REQ<: tag = " + tag + ", fin = " + fin - + ", err = " + error + ", datalen = " + len + ", data = " + ppByteBuf(data)); + logger.debug(this + " REQ<: tag = " + tag + ", fin = " + fin + ", err = " + error + + ", datalen = " + len + ", data = " + ppByteBuf(data)); } RequestHandler handler = getChannelContext().getRegisteredHandler(tag); try { @@ -278,8 +326,11 @@ } if (fin) { try { + if (logger.isInfoEnabled()) { + logger.info(this + " REQ: " + handler); + } if (error) { - // TODO + handler.errorReceived(); } else { handler.receiveCompleted(); Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -9,29 +9,34 @@ */ package org.globus.cog.karajan.workflow.service.channels; +import java.io.IOException; import java.net.Socket; +import java.net.URI; +import org.globus.cog.karajan.workflow.service.ProtocolException; +import org.globus.cog.karajan.workflow.service.RemoteConfiguration; import org.globus.cog.karajan.workflow.service.RequestManager; +import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand; public abstract class AbstractTCPChannel extends AbstractStreamKarajanChannel implements Runnable { private Socket socket; private boolean started; private Exception startException; - private final boolean client; private boolean closing; public AbstractTCPChannel(RequestManager requestManager, ChannelContext channelContext, boolean client) { - super(requestManager, channelContext); - this.client = client; + super(requestManager, channelContext, client); } - protected void setSocket(Socket socket) { + protected void setSocket(Socket socket) throws IOException { this.socket = socket; + setInputStream(socket.getInputStream()); + setOutputStream(socket.getOutputStream()); } public synchronized void start() throws ChannelException { - if (client) { + if (isClient()) { setName("C(" + socket.getLocalAddress() + ")"); } else { @@ -50,14 +55,20 @@ throw new ChannelException(startException); } logger.info(getContact() + "Channel started"); + if (isClient()) { + try { + configure(); + } + catch (Exception e) { + throw new ChannelException("Failed to configure channel", e); + } + } } public void run() { ChannelContext context = getChannelContext(); try { try { - setInputStream(socket.getInputStream()); - setOutputStream(socket.getOutputStream()); started = true; } catch (Exception e) { @@ -85,7 +96,11 @@ } + public void shutdown() { + if (isLocalShutdown()) { + return; + } try { setLocalShutdown(); ChannelManager.getManager().shutdownChannel(this); @@ -94,13 +109,13 @@ logger.debug("Channel already shutting down"); } catch (Exception e) { - logger.warn(getContact() + "Could not shutdown channel", e); + logger.warn(getContact() + ": Could not shutdown channel", e); } super.close(); synchronized (this) { notify(); } - logger.info(getContact() + "Channel terminated"); + logger.info(getContact() + ": Channel terminated"); } public void close() { @@ -108,19 +123,15 @@ try { if (!socket.isClosed()) { socket.close(); - logger.info(getContact() + "Channel shut down"); + logger.info(getContact() + ": Channel shut down"); } } catch (Exception e) { - logger.warn(getContact() + "Failed to close socket", e); + logger.warn(getContact() + ": Failed to close socket", e); } super.close(); } - public boolean isClient() { - return client; - } - public boolean isStarted() { return started; } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -25,7 +25,7 @@ private List buffer; public BufferingChannel(ChannelContext channelContext) { - super(null, channelContext); + super(null, channelContext, false); buffer = new ArrayList(); } @@ -70,9 +70,13 @@ } public boolean isOffline() { - return false; + return true; } + public boolean isStarted() { + return true; + } + public String toString() { return "BufferingChannel"; } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -45,6 +45,8 @@ private ServiceContext serviceContext; private GSSCredential cred; private ChannelAttributes attr; + private int reconnectionAttempts; + private long lastHeartBeat; public ChannelContext() { this(new ServiceContext(null)); @@ -219,4 +221,24 @@ data.put(name, o); } } + + public int getReconnectionAttempts() { + return reconnectionAttempts; + } + + public void setReconnectionAttempts(int reconnectionAttempts) { + this.reconnectionAttempts = reconnectionAttempts; + } + + public long getLastHeartBeat() { + return lastHeartBeat; + } + + public void setLastHeartBeat(long lastHeartBeat) { + this.lastHeartBeat = lastHeartBeat; + } + + public String toString() { + return data.toString(); + } } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -19,6 +19,7 @@ import org.globus.cog.karajan.workflow.service.RemoteConfiguration; import org.globus.cog.karajan.workflow.service.RequestManager; import org.globus.cog.karajan.workflow.service.Service; +import org.globus.cog.karajan.workflow.service.UserContext; import org.ietf.jgss.GSSCredential; import org.ietf.jgss.GSSException; @@ -38,7 +39,7 @@ return manager; } - public ChannelManager() { + private ChannelManager() { channels = new HashMap(); hosts = new HashMap(); rchannels = new HashMap(); @@ -75,15 +76,11 @@ HostCredentialPair hcp = new HostCredentialPair(host, cred); channel = (MetaChannel) channels.get(hcp); if (channel == null) { - channel = new MetaChannel(rm == null ? clientRequestManager : rm, - new ChannelContext()); - new Throwable().printStackTrace(); - System.err.println("Creating new meta channel with rm: " - + channel.getRequestManager()); - channel.getChannelContext().setConfiguration( - RemoteConfiguration.getDefault().find(host)); - channel.getChannelContext().setRemoteContact(host); - channel.getChannelContext().setCredential(cred); + ChannelContext context = new ChannelContext(); + context.setConfiguration(RemoteConfiguration.getDefault().find(host)); + context.setRemoteContact(host); + context.setCredential(cred); + channel = new MetaChannel(rm == null ? clientRequestManager : rm, context); registerChannel(hcp, channel); } } @@ -115,10 +112,13 @@ public void registerChannel(String url, GSSCredential cred, KarajanChannel channel) throws ChannelException { synchronized (channels) { - MetaChannel previous = new MetaChannel(channel.getRequestManager(), - channel.getChannelContext()); + HostCredentialPair hcp = new HostCredentialPair(url, cred); + MetaChannel previous = getMetaChannel(channel); + if (previous == null) { + previous = new MetaChannel(channel.getRequestManager(), channel.getChannelContext()); + } + channels.put(hcp, previous); previous.bind(channel); - channels.put(new HostCredentialPair(url, cred), previous); } } @@ -129,20 +129,18 @@ synchronized (channels) { MetaChannel previous = (MetaChannel) channels.get(id); if (previous != null) { - if (logger.isDebugEnabled()) { - logger.debug("Re-registering " + id + " = " + channel); + if (logger.isInfoEnabled()) { + logger.info("Re-registering " + id + " = " + channel); } try { /* * Check to see if a rogue user is not trying to "steal" a * channel */ - if (!previous.getChannelContext().getUserContext().getName().equals( - channel.getChannelContext().getUserContext().getName())) { + + if (!equals(getName(previous), getName(channel))) { throw new ChannelException("Channel registration denied. Expected name: " - + previous.getChannelContext().getUserContext().getName() - + "; actual name: " - + channel.getChannelContext().getUserContext().getName()); + + getName(previous) + "; actual name: " + getName(channel)); } } catch (Exception e) { @@ -173,6 +171,15 @@ } } + private boolean equals(Object o1, Object o2) { + return o1 == null ? o2 == null : o1.equals(o2); + } + + private String getName(KarajanChannel channel) { + UserContext uc = channel.getChannelContext().getUserContext(); + return uc == null ? null : uc.getName(); + } + public KarajanChannel reserveChannel(String host, GSSCredential cred, RequestManager rm) throws ChannelException { MetaChannel channel = getClientChannel(host, cred, rm); @@ -187,41 +194,111 @@ } public KarajanChannel reserveChannel(KarajanChannel channel) throws ChannelException { - return reserveChannel(getChannel(channel)); + return reserveChannel(getMetaChannel(channel)); } public KarajanChannel reserveChannel(ChannelContext context) throws ChannelException { - return reserveChannel(getChannel(context)); + return reserveChannel(getMetaChannel(context)); } + private void connect(MetaChannel meta) throws ChannelException { + try { + String contact = meta.getChannelContext().getRemoteContact(); + if (contact == null) { + // Should buffer things for a certain period of time + throw new ChannelException("Channel died and no contact available"); + } + Client client = Client.newClient(contact, meta.getChannelContext(), + clientRequestManager); + channels.put(client.getChannel().getChannelContext().getChannelID(), meta); + meta.bind(client.getChannel()); + + } + catch (ChannelException e) { + throw e; + } + catch (Exception e) { + throw new ChannelException(e); + } + } + public KarajanChannel reserveChannel(MetaChannel meta) throws ChannelException { synchronized (meta) { meta.incUsageCount(); RemoteConfiguration.Entry config = meta.getChannelContext().getConfiguration(); if (meta.isOffline()) { - try { - String contact = meta.getChannelContext().getRemoteContact(); - if (contact == null) { - // Should buffer things for a certain period of time - throw new ChannelException("Channel died and no contact available"); - } - Client client = Client.newClient(contact, meta.getChannelContext(), - clientRequestManager); - channels.put(client.getChannel().getChannelContext().getChannelID(), meta); - meta.bind(client.getChannel()); + connect(meta); + } + } + return meta; + } + public void handleChannelException(KarajanChannel channel, Exception e) { + logger.info("Handling channel exception"); + if (channel.isOffline()) { + logger.info("Channel already shut down"); + return; + } + channel.setLocalShutdown(); + ChannelContext ctx = channel.getChannelContext(); + RemoteConfiguration.Entry config = ctx.getConfiguration(); + try { + if (config != null) { + if (config.hasOption(RemoteConfiguration.RECONNECT)) { + buffer(channel); + channel.close(); + asyncReconnect(channel, e); } - catch (ChannelException e) { - throw e; + else { + shutdownChannel(channel); } - catch (Exception e) { - throw new ChannelException(e); - } } + else { + shutdownChannel(channel); + } } - return meta; + catch (Exception e2) { + logger.warn("Failed to shut down channel", e2); + } + logger.info("Channel exception handled"); } + private void asyncReconnect(final KarajanChannel channel, final Exception e) { + final ChannelContext ctx = channel.getChannelContext(); + final RemoteConfiguration.Entry config = ctx.getConfiguration(); + Thread t = new Thread() { + public void run() { + Exception ex = e; + int limit = Integer.MAX_VALUE; + if (config.hasArg(RemoteConfiguration.RECONNECT)) { + limit = Integer.parseInt(config.getArg(RemoteConfiguration.RECONNECT)); + } + while (ctx.getReconnectionAttempts() < limit) { + + try { + connect(getMetaChannel(channel)); + ctx.setReconnectionAttempts(0); + return; + } + catch (ChannelException e2) { + ctx.setReconnectionAttempts(ctx.getReconnectionAttempts() + 1); + ex = e2; + } + try { + Thread.sleep((long) (1000 * Math.pow(2, ctx.getReconnectionAttempts()))); + } + catch (InterruptedException e2) { + channel.getChannelContext().getService().irrecoverableChannelError(channel, + e2); + } + } + channel.getChannelContext().getService().irrecoverableChannelError(channel, ex); + } + }; + t.setName("Reconnector"); + t.start(); + } + public void releaseChannel(KarajanChannel channel) { if (channel == null) { return; @@ -238,18 +315,13 @@ } } else { - try { - unregisterChannel((MetaChannel) channel); - } - catch (ChannelException e) { - logger.warn("Exception caught while unregistering channel", e); - } + unregisterChannel((MetaChannel) channel); } } } } - private void registerChannel(Object key, KarajanChannel channel) { + private void registerChannel(Object key, MetaChannel channel) { synchronized (channels) { channels.put(key, channel); List l = (List) rchannels.get(channel); @@ -261,41 +333,55 @@ } } - protected void unregisterChannel(MetaChannel channel) throws ChannelException { - synchronized (channel) { - RemoteConfiguration.Entry config = channel.getChannelContext().getConfiguration(); - if (config.hasOption(RemoteConfiguration.BUFFER)) { - channel.bind(new BufferingChannel(channel.getChannelContext())); - } - else if (config.hasOption(RemoteConfiguration.POLL)) { - if (config.hasArg(RemoteConfiguration.POLL)) { - String time = config.getArg(RemoteConfiguration.POLL); - int itime = Integer.parseInt(time); - channel.poll(itime); + public void unregisterChannel(KarajanChannel channel) throws ChannelException { + unregisterChannel(getMetaChannel(channel)); + } + + protected void unregisterChannel(MetaChannel channel) { + try { + synchronized (channel) { + RemoteConfiguration.Entry config = channel.getChannelContext().getConfiguration(); + if (config != null) { + if (config.hasOption(RemoteConfiguration.BUFFER)) { + channel.bind(new BufferingChannel(channel.getChannelContext())); + } + else if (config.hasOption(RemoteConfiguration.POLL)) { + if (config.hasArg(RemoteConfiguration.POLL)) { + String time = config.getArg(RemoteConfiguration.POLL); + int itime = Integer.parseInt(time); + channel.poll(itime); + } + else { + channel.poll(300); + } + channel.bind(new NullChannel()); + } + else { + channel.bind(new NullChannel()); + } } else { - channel.poll(300); + channel.bind(new NullChannel()); } - channel.bind(new NullChannel()); } - else { - channel.bind(new NullChannel()); - } } + catch (ChannelException e) { + logger.error("Exception caught while unregistering channel", e); + } } public void shutdownChannel(KarajanChannel channel) throws ChannelException { - unregisterChannel(getChannel(channel)); + unregisterChannel(getMetaChannel(channel)); } - private MetaChannel getChannel(KarajanChannel channel) throws ChannelException { + private MetaChannel getMetaChannel(KarajanChannel channel) throws ChannelException { if (channel instanceof MetaChannel) { return (MetaChannel) channel; } - return getChannel(channel.getChannelContext()); + return getMetaChannel(channel.getChannelContext()); } - private MetaChannel getChannel(ChannelContext context) throws ChannelException { + private MetaChannel getMetaChannel(ChannelContext context) throws ChannelException { synchronized (channels) { if (logger.isDebugEnabled()) { logger.debug("\nLooking up " + context.getChannelID()); @@ -324,11 +410,11 @@ } public void reserveLongTerm(KarajanChannel channel) throws ChannelException { - getChannel(channel).incLongTermUsageCount(); + getMetaChannel(channel).incLongTermUsageCount(); } public void releaseLongTerm(KarajanChannel channel) throws ChannelException { - getChannel(channel).decLongTermUsageCount(); + getMetaChannel(channel).decLongTermUsageCount(); } private static class HostCredentialPair { @@ -371,4 +457,9 @@ } } + + private void buffer(KarajanChannel channel) throws ChannelException { + MetaChannel meta = getMetaChannel(channel); + meta.bind(new BufferingChannel(channel.getChannelContext())); + } } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -16,8 +16,10 @@ import org.globus.cog.karajan.workflow.events.EventBus; import org.globus.cog.karajan.workflow.service.FallbackAuthorization; import org.globus.cog.karajan.workflow.service.GSSService; +import org.globus.cog.karajan.workflow.service.RemoteConfiguration; import org.globus.cog.karajan.workflow.service.RequestManager; import org.globus.cog.karajan.workflow.service.UserContext; +import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand; import org.globus.cog.karajan.workflow.service.commands.ShutdownCommand; import org.globus.gsi.GSIConstants; import org.globus.gsi.gssapi.GSSConstants; @@ -28,7 +30,6 @@ import org.globus.gsi.gssapi.net.GssSocket; import org.globus.gsi.gssapi.net.GssSocketFactory; import org.gridforum.jgss.ExtendedGSSContext; -import org.ietf.jgss.GSSContext; import org.ietf.jgss.GSSCredential; import org.ietf.jgss.GSSManager; @@ -41,8 +42,11 @@ private boolean shuttingDown; private Exception startException; private Replier replier; + private int id; + private static int sid = 1; - public GSSChannel(GssSocket socket, RequestManager requestManager, ChannelContext sc) { + public GSSChannel(GssSocket socket, RequestManager requestManager, ChannelContext sc) + throws IOException { super(requestManager, sc, false); setSocket(socket); this.socket = socket; @@ -56,6 +60,7 @@ } private void init() { + id = sid++; replier = new Replier(this); EventBus.initialize(); } @@ -65,8 +70,13 @@ } public void start() throws ChannelException { + reconnect(); + super.start(); + } + + protected void reconnect() throws ChannelException { try { - if (socket == null) { + if (getContact() != null) { HostAuthorization hostAuthz = new HostAuthorization("host"); Authorization authz = new FallbackAuthorization(new Authorization[] { hostAuthz, @@ -79,7 +89,7 @@ GSSManager manager = new GlobusGSSManagerImpl(); ExtendedGSSContext gssContext = (ExtendedGSSContext) manager.createContext(null, - GSSConstants.MECH_OID, cred, cred.getRemainingLifetime()); + GSSConstants.MECH_OID, cred, cred.getRemainingLifetime()); gssContext.requestAnonymity(false); gssContext.requestCredDeleg(false); @@ -89,21 +99,20 @@ URI contact = getContact(); socket = (GssSocket) GssSocketFactory.getDefault().createSocket(contact.getHost(), contact.getPort(), gssContext); - setSocket(socket); socket.setKeepAlive(true); socket.setSoTimeout(0); socket.setWrapMode(GSIConstants.MODE_SSL.intValue()); socket.setAuthorization(authz); + setSocket(socket); logger.info("Connected to " + contact); - this.getChannelContext().setRemoteContact(contact.toString()); + getChannelContext().setRemoteContact(contact.toString()); } } catch (Exception e) { throw new ChannelException("Failed to start channel " + this, e); } - super.start(); } protected void initializeConnection() { @@ -168,7 +177,7 @@ } public String toString() { - return "GSSC-" + getContact(); + return "GSS" + (isClient() ? "C" : "S") + "Channel-" + getContact() + "(" + id + ")"; } protected synchronized void ensureCallbackServiceStarted() throws Exception { Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -55,6 +55,8 @@ void setChannelContext(ChannelContext context); boolean isOffline(); + + boolean isStarted(); void unregisterCommand(Command cmd); Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -30,11 +30,11 @@ private boolean shuttingDown; public MetaChannel(ChannelContext channelContext) { - super(null, channelContext); + super(null, channelContext, false); } public MetaChannel(RequestManager requestManager, ChannelContext channelContext) { - super(requestManager, channelContext); + super(requestManager, channelContext, false); } public synchronized void sendTaggedData(int tag, int flags, byte[] data) { @@ -102,12 +102,7 @@ } deactivator = new TimerTask() { public void run() { - try { - ChannelManager.getManager().unregisterChannel(MetaChannel.this); - } - catch (ChannelException e) { - logger.warn("Exception caught while unregistering channel", e); - } + ChannelManager.getManager().unregisterChannel(MetaChannel.this); } }; getTimer().schedule(deactivator, (long) seconds * 1000); @@ -164,6 +159,16 @@ return false; } + public boolean isStarted() { + KarajanChannel crt = current; + if (crt != null) { + return crt.isStarted(); + } + else { + return false; + } + } + public void start() throws ChannelException { } } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -12,13 +12,21 @@ import org.globus.cog.karajan.workflow.service.UserContext; public class NullChannel extends AbstractKarajanChannel { + private boolean sink; protected NullChannel() { - super(null, null); + super(null, null, false); } + + protected NullChannel(boolean sink) { + super(null, null, false); + this.sink = sink; + } public void sendTaggedData(int i, int flags, byte[] bytes) { - throw new ChannelIOException("Null channel"); + if (!sink) { + throw new ChannelIOException("Null channel"); + } } public UserContext getUserContext() { @@ -36,4 +44,7 @@ public void start() throws ChannelException { } + public boolean isStarted() { + return true; + } } Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -9,6 +9,7 @@ */ package org.globus.cog.karajan.workflow.service.channels; +import java.io.IOException; import java.net.Socket; import java.net.URI; @@ -25,13 +26,19 @@ setName(contact.toString()); } - public TCPChannel(Socket socket, RequestManager requestManager, ChannelContext channelContext) { + public TCPChannel(Socket socket, RequestManager requestManager, ChannelContext channelContext) + throws IOException { super(requestManager, channelContext, false); setSocket(socket); uc = new UserContext(null, channelContext); } public void start() throws ChannelException { + reconnect(); + super.start(); + } + + protected void reconnect() throws ChannelException { try { if (contact != null) { setSocket(new Socket(contact.getHost(), contact.getPort())); @@ -40,7 +47,6 @@ catch (Exception e) { throw new ChannelException("Failed to create socket", e); } - super.start(); } public UserContext getUserContext() { Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java 2008-09-16 17:54:08 UTC (rev 2170) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java 2008-09-16 17:56:08 UTC (rev 2171) @@ -40,10 +40,11 @@ private ServiceContext sc; private UDPService service; private Map tagSeq; + private boolean started; public UDPChannel(DatagramSocket ds, ChannelContext context, RequestManager rm, UDPService service, InetSocketAddress addr) { - super(rm, context); + super(rm, context, false); this.ds = ds; this.service = service; this.addr = addr.getAddress(); @@ -53,7 +54,7 @@ } public UDPChannel(URI contact, ChannelContext context, RequestManager rm) { - super(rm, context); + super(rm, context, true); this.contact = contact; } @@ -80,6 +81,7 @@ addr = InetAddress.getByName(contact.getHost()); port = contact.getPort(); } + started = true; } catch (Exception e) { throw new ChannelException("Failed to start UDP channel", e); @@ -204,4 +206,8 @@ public InetSocketAddress getRemoteAddress() { return new InetSocketAddress(addr, port); } + + public boolean isStarted() { + return started; + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:54:28
|
Revision: 2170 http://cogkit.svn.sourceforge.net/cogkit/?rev=2170&view=rev Author: hategan Date: 2008-09-16 17:54:08 +0000 (Tue, 16 Sep 2008) Log Message: ----------- better handling of channel errors Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-09-16 17:51:02 UTC (rev 2169) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-09-16 17:54:08 UTC (rev 2170) @@ -21,9 +21,12 @@ import java.util.Set; import org.apache.log4j.Logger; +import org.globus.cog.karajan.workflow.service.RemoteConfiguration; import org.globus.cog.karajan.workflow.service.RequestManager; +import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand; -public abstract class AbstractStreamKarajanChannel extends AbstractKarajanChannel { +public abstract class AbstractStreamKarajanChannel extends AbstractKarajanChannel implements + Purgeable { public static final Logger logger = Logger.getLogger(AbstractStreamKarajanChannel.class); public static final int STATE_IDLE = 0; @@ -40,8 +43,8 @@ private int state, tag, flags, len; protected AbstractStreamKarajanChannel(RequestManager requestManager, - ChannelContext channelContext) { - super(requestManager, channelContext); + ChannelContext channelContext, boolean client) { + super(requestManager, channelContext, client); rhdr = new byte[HEADER_LEN]; } @@ -69,6 +72,33 @@ this.contact = contact; } + protected abstract void reconnect() throws ChannelException; + + protected synchronized void handleChannelException(Exception e) { + logger.info("Channel config: " + getChannelContext().getConfiguration()); + ChannelManager.getManager().handleChannelException(this, e); + try { + getSender().purge(this, new NullChannel(true)); + } + catch (IOException e1) { + logger.warn("Failed to purge queued messages", e1); + } + } + + protected void configure() throws Exception { + URI callbackURI = null; + ChannelContext sc = getChannelContext(); + if (sc.getConfiguration().hasOption(RemoteConfiguration.CALLBACK)) { + callbackURI = getCallbackURI(); + } + String remoteID = sc.getChannelID().getRemoteID(); + + ChannelConfigurationCommand ccc = new ChannelConfigurationCommand(sc.getConfiguration(), + callbackURI); + ccc.execute(this); + logger.info("Channel configured"); + } + public synchronized void sendTaggedData(int tag, int flags, byte[] data) { getSender().enqueue(tag, flags, data, this); } @@ -113,31 +143,35 @@ return true; } + public void purge(KarajanChannel channel) throws IOException { + getSender().purge(this, channel); + } + protected void register() { getMultiplexer(FAST).register(this); } private static final int SENDER_COUNT = 1; private static Sender[] sender; - private static int crtSender; + private static int crtSender; private static synchronized Sender getSender() { - if (sender == null) { - sender = new Sender[SENDER_COUNT]; - for (int i = 0; i < SENDER_COUNT; i++) { - sender[i] = new Sender(); - sender[i].start(); - } - } - try { - return sender[crtSender++]; - } - finally { - if (crtSender == SENDER_COUNT) { - crtSender = 0; - } - } - } + if (sender == null) { + sender = new Sender[SENDER_COUNT]; + for (int i = 0; i < SENDER_COUNT; i++) { + sender[i] = new Sender(); + sender[i].start(); + } + } + try { + return sender[crtSender++]; + } + finally { + if (crtSender == SENDER_COUNT) { + crtSender = 0; + } + } + } private static class SendEntry { public final int tag, flags; @@ -163,14 +197,15 @@ shdr = new byte[HEADER_LEN]; } - public synchronized void enqueue(int tag, int flags, byte[] data, AbstractStreamKarajanChannel channel) { + public synchronized void enqueue(int tag, int flags, byte[] data, + AbstractStreamKarajanChannel channel) { queue.addLast(new SendEntry(tag, flags, data, channel)); notify(); } public void run() { try { - SendEntry e; + SendEntry e; while (true) { synchronized (this) { while (queue.isEmpty()) { @@ -181,14 +216,22 @@ try { send(e.tag, e.flags, e.data, e.channel.getOutputStream()); } + catch (IOException ex) { + logger.info("Channel IOException", ex); + synchronized (this) { + queue.addFirst(e); + } + e.channel.handleChannelException(ex); + } catch (Exception ex) { - ex.printStackTrace(); - try { - e.channel.getChannelContext().getRegisteredCommand(e.tag).errorReceived(null, ex); - } - catch (Exception exx) { - logger.warn(exx); - } + ex.printStackTrace(); + try { + e.channel.getChannelContext().getRegisteredCommand(e.tag).errorReceived( + null, ex); + } + catch (Exception exx) { + logger.warn(exx); + } } } } @@ -197,11 +240,25 @@ } } + public void purge(KarajanChannel source, KarajanChannel channel) throws IOException { + SendEntry e; + synchronized (this) { + Iterator i = queue.iterator(); + while (i.hasNext()) { + e = (SendEntry) i.next(); + if (e.channel == source) { + channel.sendTaggedData(e.tag, e.flags, e.data); + i.remove(); + } + } + } + } + private void send(int tag, int flags, byte[] data, OutputStream os) throws IOException { pack(shdr, 0, tag); pack(shdr, 4, flags); pack(shdr, 8, data.length); - synchronized(os) { + synchronized (os) { os.write(shdr); os.write(data); os.flush(); @@ -249,6 +306,9 @@ Iterator i = channels.iterator(); while (i.hasNext()) { AbstractStreamKarajanChannel channel = (AbstractStreamKarajanChannel) i.next(); + if (channel.isClosed()) { + i.remove(); + } try { any |= channel.step(); } @@ -265,6 +325,8 @@ while (i.hasNext()) { channels.add(i.next()); } + remove.clear(); + add.clear(); } if (!any) { Thread.sleep(20); @@ -280,7 +342,7 @@ if (logger.isDebugEnabled()) { logger.debug("Channel exception caught", e); } - channel.shutdown(); + channel.handleChannelException(e); remove.add(channel); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:51:22
|
Revision: 2169 http://cogkit.svn.sourceforge.net/cogkit/?rev=2169&view=rev Author: hategan Date: 2008-09-16 17:51:02 +0000 (Tue, 16 Sep 2008) Log Message: ----------- fixed command retries Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java 2008-09-16 17:49:40 UTC (rev 2168) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java 2008-09-16 17:51:02 UTC (rev 2169) @@ -20,20 +20,22 @@ import org.globus.cog.karajan.workflow.service.ReplyTimeoutException; import org.globus.cog.karajan.workflow.service.RequestReply; import org.globus.cog.karajan.workflow.service.channels.ChannelIOException; +import org.globus.cog.karajan.workflow.service.channels.ChannelManager; import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; public abstract class Command extends RequestReply { private static final Logger logger = Logger.getLogger(Command.class); - + private static final Timer timer; - + static { timer = new Timer(); } - public static final int DEFAULT_REPLY_TIMEOUT = 1000 * 60; - public static final int MAX_RETRIES = 2; + public static final int DEFAULT_REPLY_TIMEOUT = 10000 * 60; + public static final int DEFAULT_MAX_RETRIES = 2; private int replyTimeout = DEFAULT_REPLY_TIMEOUT; + private int maxRetries = DEFAULT_MAX_RETRIES; private Callback cb; private String errorMsg; @@ -86,6 +88,7 @@ public void send() throws ProtocolException { KarajanChannel channel = getChannel(); + logger.info("Sending " + this + " on " + channel); List outData = getOutData(); if (channel == null) { throw new ProtocolException("Unregistered command"); @@ -96,6 +99,9 @@ logger.debug(ppOutData("CMD")); } try { + if (logger.isInfoEnabled()) { + logger.info(this + " CMD: " + this); + } channel.sendTaggedData(getId(), fin, getOutCmd().getBytes()); if (!fin) { Iterator i = outData.iterator(); @@ -107,7 +113,7 @@ timer.schedule(timeout = new Timeout(), replyTimeout); } catch (ChannelIOException e) { - reexecute(); + reexecute(e.getMessage(), e); } } @@ -142,6 +148,14 @@ this.replyTimeout = replyTimeout; } + public int getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + public void receiveCompleted() { if (timeout == null) { return; @@ -181,35 +195,44 @@ public void channelClosed() { if (!this.isInDataReceived()) { - reexecute(); + reexecute("Channel closed", null); } } - protected void reexecute() { - getChannel().getChannelContext().reexecute(this); - } - - protected void handleReplyTimeout() { - timeout = null; - if (++retries > MAX_RETRIES) { - errorReceived("Reply timeout", new ReplyTimeoutException()); + protected void reexecute(String message, Exception ex) { + if (++retries > maxRetries) { + errorReceived(message, ex); } else { try { + setChannel(ChannelManager.getManager().reserveChannel( + getChannel().getChannelContext())); send(); } catch (ProtocolException e) { errorReceived(e.getMessage(), e); } + catch (Exception e) { + reexecute(e.getMessage(), ex); + } } } - + + protected void handleReplyTimeout() { + timeout = null; + reexecute("Reply timeout", new ReplyTimeoutException()); + } + private class Timeout extends TimerTask { public void run() { handleReplyTimeout(); } } + public String toString() { + return "Command(" + this.getOutCmd() + ")"; + } + public static interface Callback { void replyReceived(Command cmd); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:49:45
|
Revision: 2168 http://cogkit.svn.sourceforge.net/cogkit/?rev=2168&view=rev Author: hategan Date: 2008-09-16 17:49:40 +0000 (Tue, 16 Sep 2008) Log Message: ----------- added heartbeat command/handler Added Paths: ----------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/HeartBeatHandler.java Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java (rev 0) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/HeartBeatCommand.java 2008-09-16 17:49:40 UTC (rev 2168) @@ -0,0 +1,35 @@ +//---------------------------------------------------------------------- +//This code is developed as part of the Java CoG Kit project +//The terms of the license can be found at http://www.cogkit.org/license +//This message may not be removed or altered. +//---------------------------------------------------------------------- + +/* + * Created on Jul 20, 2005 + */ +package org.globus.cog.karajan.workflow.service.commands; + +import org.globus.cog.karajan.workflow.service.ProtocolException; +import org.globus.cog.karajan.workflow.service.handlers.HeartBeatHandler; + + +public class HeartBeatCommand extends Command { + private long start; + private static int sid; + private int id; + + public HeartBeatCommand() { + super(HeartBeatHandler.NAME); + id = sid++; + } + + public void send() throws ProtocolException { + start = System.currentTimeMillis(); + addOutData(String.valueOf(start)); + super.send(); + } + + public void replyReceived(byte[] data) throws ProtocolException { + super.replyReceived(data); + } +} Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/HeartBeatHandler.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/HeartBeatHandler.java (rev 0) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/HeartBeatHandler.java 2008-09-16 17:49:40 UTC (rev 2168) @@ -0,0 +1,21 @@ +//---------------------------------------------------------------------- +//This code is developed as part of the Java CoG Kit project +//The terms of the license can be found at http://www.cogkit.org/license +//This message may not be removed or altered. +//---------------------------------------------------------------------- + +/* + * Created on Jul 21, 2005 + */ +package org.globus.cog.karajan.workflow.service.handlers; + +import org.globus.cog.karajan.workflow.service.ProtocolException; + + +public class HeartBeatHandler extends RequestHandler { + public static final String NAME = "HEARTBEAT"; + + public void requestComplete() throws ProtocolException { + sendReply(String.valueOf(System.currentTimeMillis())); + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:48:52
|
Revision: 2167 http://cogkit.svn.sourceforge.net/cogkit/?rev=2167&view=rev Author: hategan Date: 2008-09-16 17:48:48 +0000 (Tue, 16 Sep 2008) Log Message: ----------- added heartbeat command/handler Added Paths: ----------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatCheckTask.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatTask.java Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatCheckTask.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatCheckTask.java (rev 0) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatCheckTask.java 2008-09-16 17:48:48 UTC (rev 2167) @@ -0,0 +1,41 @@ +//---------------------------------------------------------------------- +//This code is developed as part of the Java CoG Kit project +//The terms of the license can be found at http://www.cogkit.org/license +//This message may not be removed or altered. +//---------------------------------------------------------------------- + +/* + * Created on Sep 9, 2008 + */ +package org.globus.cog.karajan.workflow.service.channels; + +import java.util.TimerTask; + +import org.apache.log4j.Logger; + +public class HeartBeatCheckTask extends TimerTask { + public static final Logger logger = Logger.getLogger(HeartBeatCheckTask.class); + + private KarajanChannel channel; + private int multiplier, interval; + + public HeartBeatCheckTask(KarajanChannel channel, int interval, int multiplier) { + this.channel = channel; + this.multiplier = multiplier; + this.interval = interval; + channel.getChannelContext().setLastHeartBeat(System.currentTimeMillis()); + } + + public void run() { + if (channel.isOffline()) { + this.cancel(); + } + else if (channel.isStarted()) { + if (channel.getChannelContext().getLastHeartBeat() - interval * multiplier > System.currentTimeMillis()) { + logger.warn("Channel (" + channel + ") has not received any heartbeat in " + + multiplier + " intervals. Shutting it down."); + channel.shutdown(); + } + } + } +} Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatTask.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatTask.java (rev 0) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatTask.java 2008-09-16 17:48:48 UTC (rev 2167) @@ -0,0 +1,54 @@ +//---------------------------------------------------------------------- +//This code is developed as part of the Java CoG Kit project +//The terms of the license can be found at http://www.cogkit.org/license +//This message may not be removed or altered. +//---------------------------------------------------------------------- + +/* + * Created on Sep 9, 2008 + */ +package org.globus.cog.karajan.workflow.service.channels; + +import java.util.TimerTask; + +import org.apache.log4j.Logger; +import org.globus.cog.karajan.workflow.service.ProtocolException; +import org.globus.cog.karajan.workflow.service.commands.Command; +import org.globus.cog.karajan.workflow.service.commands.HeartBeatCommand; +import org.globus.cog.karajan.workflow.service.commands.Command.Callback; + +public class HeartBeatTask extends TimerTask implements Callback { + public static final Logger logger = Logger.getLogger(HeartBeatTask.class); + + private KarajanChannel channel; + + public HeartBeatTask(KarajanChannel channel) { + this.channel = channel; + } + + public void run() { + if (channel.isOffline()) { + this.cancel(); + } + else if (channel.isStarted()) { + HeartBeatCommand hbc = new HeartBeatCommand(); + try { + hbc.executeAsync(channel, this); + } + catch (ProtocolException e) { + this.cancel(); + logger.error("Protocol error caught while trying to send heartbeat. " + + "Suspending heartbeats for this channel.", e); + } + } + } + + public void errorReceived(Command cmd, String msg, Exception t) { + // if it's a channel error, the channel should have figured it out + logger.warn("Heartbeat failed: " + msg, t); + } + + public void replyReceived(Command cmd) { + // we're good + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:47:47
|
Revision: 2166 http://cogkit.svn.sourceforge.net/cogkit/?rev=2166&view=rev Author: hategan Date: 2008-09-16 17:47:44 +0000 (Tue, 16 Sep 2008) Log Message: ----------- log resources when added Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2008-09-16 17:47:11 UTC (rev 2165) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/scheduler/AbstractScheduler.java 2008-09-16 17:47:44 UTC (rev 2166) @@ -119,6 +119,9 @@ } public void setResources(ContactSet grid) { + if (logger.isInfoEnabled()) { + logger.info("Setting resources to: " + grid); + } this.grid = grid; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:47:14
|
Revision: 2165 http://cogkit.svn.sourceforge.net/cogkit/?rev=2165&view=rev Author: hategan Date: 2008-09-16 17:47:11 +0000 (Tue, 16 Sep 2008) Log Message: ----------- added toString Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/ContactSet.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/ContactSet.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/ContactSet.java 2008-09-16 17:42:21 UTC (rev 2164) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/util/ContactSet.java 2008-09-16 17:47:11 UTC (rev 2165) @@ -66,4 +66,8 @@ contacts.put(contact.getHost(), contact); } } + + public String toString() { + return contacts.toString(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:42:24
|
Revision: 2164 http://cogkit.svn.sourceforge.net/cogkit/?rev=2164&view=rev Author: hategan Date: 2008-09-16 17:42:21 +0000 (Tue, 16 Sep 2008) Log Message: ----------- ... Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-09-16 17:41:22 UTC (rev 2163) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/CoasterService.java 2008-09-16 17:42:21 UTC (rev 2164) @@ -19,8 +19,10 @@ import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager; import org.globus.cog.karajan.workflow.service.ConnectionHandler; import org.globus.cog.karajan.workflow.service.GSSService; +import org.globus.cog.karajan.workflow.service.RemoteConfiguration; import org.globus.cog.karajan.workflow.service.RequestManager; import org.globus.cog.karajan.workflow.service.ServiceRequestManager; +import org.globus.cog.karajan.workflow.service.channels.ChannelException; import org.globus.cog.karajan.workflow.service.channels.ChannelManager; import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; import org.globus.gsi.gssapi.auth.SelfAuthorization; @@ -30,7 +32,7 @@ .getLogger(CoasterService.class); public static final int IDLE_TIMEOUT = 120 * 1000; - + public static final RequestManager COASTER_REQUEST_MANAGER = new CoasterRequestManager(); private String registrationURL, id; @@ -91,8 +93,11 @@ if (id != null) { try { logger.info("Reserving channel for registration"); + RemoteConfiguration.getDefault().prepend( + getChannelConfiguration(registrationURL)); KarajanChannel channel = ChannelManager.getManager() - .reserveChannel(registrationURL, null, COASTER_REQUEST_MANAGER); + .reserveChannel(registrationURL, null, + COASTER_REQUEST_MANAGER); channel.getChannelContext().setService(this); logger.info("Sending registration"); RegistrationCommand reg = new RegistrationCommand(id, @@ -133,6 +138,10 @@ } } + public void irrecoverableChannelError(KarajanChannel channel, Exception e) { + stop(e); + } + private synchronized void checkIdleTime() { // the notification manager should probably not be a singleton long idleTime = NotificationManager.getDefault().getIdleTime(); @@ -152,15 +161,15 @@ public synchronized void suspend() { this.suspended = true; } - + public synchronized boolean isSuspended() { return suspended; } - + public synchronized void resume() { this.suspended = false; } - + public void shutdown() { super.shutdown(); jobQueue.getWorkerManager().shutdown(); @@ -171,6 +180,12 @@ return jobQueue; } + protected RemoteConfiguration.Entry getChannelConfiguration(String contact) { + return new RemoteConfiguration.Entry( + contact.replaceAll("\\.", "\\."), + "KEEPALIVE, RECONNECT(8), HEARTBEAT(300)"); + } + public static void main(String[] args) { try { CoasterService s; @@ -188,5 +203,9 @@ e.printStackTrace(); System.exit(1); } + catch (Throwable t) { + t.printStackTrace(); + System.exit(2); + } } } Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java 2008-09-16 17:41:22 UTC (rev 2163) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/LocalTCPService.java 2008-09-16 17:42:21 UTC (rev 2164) @@ -62,6 +62,4 @@ } super.handleConnection(socket); } - - } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:41:25
|
Revision: 2163 http://cogkit.svn.sourceforge.net/cogkit/?rev=2163&view=rev Author: hategan Date: 2008-09-16 17:41:22 +0000 (Tue, 16 Sep 2008) Log Message: ----------- ... Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-09-16 17:40:55 UTC (rev 2162) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/job/manager/WorkerManager.java 2008-09-16 17:41:22 UTC (rev 2163) @@ -351,6 +351,7 @@ } public void workerTerminated(Worker worker) { + logger.warn("Worker terminated: " + worker); Status s = worker.getStatus(); if (s.getStatusCode() == Status.FAILED) { synchronized (this) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:40:58
|
Revision: 2162 http://cogkit.svn.sourceforge.net/cogkit/?rev=2162&view=rev Author: hategan Date: 2008-09-16 17:40:55 +0000 (Tue, 16 Sep 2008) Log Message: ----------- ... Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalRequestManager.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java 2008-09-16 17:39:15 UTC (rev 2161) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/JobStatusHandler.java 2008-09-16 17:40:55 UTC (rev 2162) @@ -9,6 +9,7 @@ */ package org.globus.cog.abstraction.coaster.service.local; +import org.apache.log4j.Logger; import org.globus.cog.abstraction.impl.common.StatusImpl; import org.globus.cog.abstraction.impl.common.execution.JobException; import org.globus.cog.abstraction.impl.execution.coaster.NotificationManager; @@ -17,6 +18,8 @@ import org.globus.cog.karajan.workflow.service.handlers.RequestHandler; public class JobStatusHandler extends RequestHandler { + public static final Logger logger = Logger.getLogger(JobStatusHandler.class); + public static final String NAME = "JOBSTATUS"; public void requestComplete() throws ProtocolException { @@ -35,6 +38,7 @@ if (message != null && !message.equals("")) { s.setMessage(message); } + logger.error("Job " + jobId + " is " + status); NotificationManager.getDefault().notificationReceived(jobId, s); sendReply("OK"); } Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalRequestManager.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalRequestManager.java 2008-09-16 17:39:15 UTC (rev 2161) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalRequestManager.java 2008-09-16 17:40:55 UTC (rev 2162) @@ -11,13 +11,17 @@ import org.globus.cog.karajan.workflow.service.AbstractRequestManager; import org.globus.cog.karajan.workflow.service.handlers.ChannelConfigurationHandler; +import org.globus.cog.karajan.workflow.service.handlers.HeartBeatHandler; -public class LocalRequestManager extends AbstractRequestManager { +public class LocalRequestManager extends AbstractRequestManager { + public static final LocalRequestManager INSTANCE = new LocalRequestManager(); + public LocalRequestManager() { addHandler(VersionHandler.NAME, VersionHandler.class); addHandler(RegistrationHandler.NAME, RegistrationHandler.class); addHandler(UnregisterHandler.NAME, UnregisterHandler.class); addHandler("CHANNELCONFIG", ChannelConfigurationHandler.class); addHandler(JobStatusHandler.NAME, JobStatusHandler.class); + addHandler(HeartBeatHandler.NAME, HeartBeatHandler.class); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:39:20
|
Revision: 2161 http://cogkit.svn.sourceforge.net/cogkit/?rev=2161&view=rev Author: hategan Date: 2008-09-16 17:39:15 +0000 (Tue, 16 Sep 2008) Log Message: ----------- ... Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java 2008-09-16 17:38:10 UTC (rev 2160) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/coaster/service/local/LocalService.java 2008-09-16 17:39:15 UTC (rev 2161) @@ -12,16 +12,18 @@ import java.io.IOException; import java.net.Socket; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.log4j.Logger; import org.globus.cog.abstraction.coaster.service.Registering; import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException; -import org.globus.cog.abstraction.impl.execution.coaster.CoasterChannelManager; import org.globus.cog.abstraction.interfaces.Status; import org.globus.cog.abstraction.interfaces.Task; import org.globus.cog.karajan.workflow.service.ConnectionHandler; import org.globus.cog.karajan.workflow.service.GSSService; +import org.globus.cog.karajan.workflow.service.channels.ChannelManager; import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; import org.globus.gsi.gssapi.auth.SelfAuthorization; @@ -56,8 +58,10 @@ logger.debug("Got connection"); try { ConnectionHandler handler = new ConnectionHandler(this, sock, - new LocalRequestManager()); + LocalRequestManager.INSTANCE); + logger.info("Initialized connection handler"); handler.start(); + logger.info("Connection handler started"); } catch (Exception e) { logger.warn("Could not start connection handler", e); @@ -77,7 +81,7 @@ heardOf(id); synchronized (services) { while (!services.containsKey(id)) { - services.wait(1000); + services.wait(250); if (timeout < System.currentTimeMillis() - lastHeardOf(id)) { throw new TaskSubmissionException( "Timed out waiting for registration for " + id); @@ -116,24 +120,19 @@ } synchronized (services) { if (services.containsKey(id)) { - throw new IllegalArgumentException( - "Another registration with the same id (" + id - + ") already exists"); + logger.info("Replacing channel for service with id=" + id + + "."); } - else { - try { - CoasterChannelManager.getManager() - .registerChannel(url, - channel.getUserContext().getCredential(), - channel); - } - catch (Exception e) { - throw new RuntimeException("Failed to register channel " - + url); - } - services.put(id, url); - services.notifyAll(); + try { + ChannelManager.getManager().registerChannel(url, + channel.getUserContext().getCredential(), channel); } + catch (Exception e) { + throw new RuntimeException("Failed to register channel " + + url); + } + services.put(id, url); + services.notifyAll(); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:38:13
|
Revision: 2160 http://cogkit.svn.sourceforge.net/cogkit/?rev=2160&view=rev Author: hategan Date: 2008-09-16 17:38:10 +0000 (Tue, 16 Sep 2008) Log Message: ----------- cleanup fixes Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-09-16 17:36:55 UTC (rev 2159) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/ServiceManager.java 2008-09-16 17:38:10 UTC (rev 2160) @@ -41,7 +41,10 @@ import org.globus.cog.abstraction.interfaces.StatusListener; import org.globus.cog.abstraction.interfaces.Task; import org.globus.cog.abstraction.interfaces.TaskHandler; +import org.globus.cog.karajan.workflow.service.channels.ChannelManager; import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; +import org.globus.cog.karajan.workflow.service.commands.Command; +import org.globus.cog.karajan.workflow.service.commands.Command.Callback; import org.ietf.jgss.GSSCredential; public class ServiceManager implements StatusListener { @@ -177,12 +180,14 @@ } } try { - GSSCredential cred = (GSSCredential) t.getService(0) - .getSecurityContext().getCredentials(); - KarajanChannel channel = CoasterChannelManager.getManager() - .getExistingChannel(url, cred); - if (channel != null) { - channel.close(); + if (url != null) { + GSSCredential cred = (GSSCredential) t.getService(0) + .getSecurityContext().getCredentials(); + KarajanChannel channel = ChannelManager.getManager() + .getExistingChannel(url, cred); + if (channel != null) { + channel.close(); + } } } catch (Exception e) { @@ -307,34 +312,67 @@ catch (NoSuchAlgorithmException e) { r = (int) Math.random() * Integer.MAX_VALUE; } - return String.valueOf(r); + if (r < 0) { + return '0' + String.valueOf(-r); + } + else { + return '1' + String.valueOf(r); + } } - private class ServiceReaper extends Thread { + private class ServiceReaper extends Thread implements Callback { + private int count; public ServiceReaper() { setName("Coaster service reaper"); } public void run() { + System.out.println("Cleaning up..."); Iterator i = services.values().iterator(); + count = services.size(); while (i.hasNext()) { String url = (String) i.next(); Object cred = credentials.get(url); try { - KarajanChannel channel = CoasterChannelManager - .getManager().reserveChannel(url, - (GSSCredential) cred); + System.out.println("Shutting down service at " + url); + KarajanChannel channel = ChannelManager.getManager() + .reserveChannel(url, (GSSCredential) cred); + System.out.println("Got channel " + channel); ServiceShutdownCommand ssc = new ServiceShutdownCommand(); - ssc.execute(channel); - CoasterChannelManager.getManager() - .releaseChannel(channel); + ssc.setReplyTimeout(10); + ssc.setMaxRetries(0); + ssc.executeAsync(channel, this); + ChannelManager.getManager().releaseChannel(channel); } catch (Exception e) { logger.warn("Failed to shut down service " + url, e); } } + synchronized (this) { + while (count > 0) { + try { + wait(100); + } + catch (InterruptedException e) { + return; + } + } + } + System.out.println(" Done"); } + public synchronized void errorReceived(Command cmd, String msg, + Exception t) { + System.out.print("-"); + count--; + notifyAll(); + } + + public synchronized void replyReceived(Command cmd) { + System.out.print("+"); + count--; + notifyAll(); + } } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:36:59
|
Revision: 2159 http://cogkit.svn.sourceforge.net/cogkit/?rev=2159&view=rev Author: hategan Date: 2008-09-16 17:36:55 +0000 (Tue, 16 Sep 2008) Log Message: ----------- better logging Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-09-16 17:35:52 UTC (rev 2158) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/Bootstrap.java 2008-09-16 17:36:55 UTC (rev 2159) @@ -13,18 +13,22 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.PrintStream; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.security.MessageDigest; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.Iterator; import java.util.List; public class Bootstrap { public static final boolean fork = true; - private static final File cacheDir = new File(System + private static final File CACHE_DIR = new File(System .getProperty("user.home") + File.separator + ".globus" @@ -32,7 +36,13 @@ + "coasters" + File.separator + "cache"); + private static final File LOG_DIR = new File(System + .getProperty("user.home") + + File.separator + ".globus" + File.separator + "coasters"); + public static final String SERVICE_CLASS = "org.globus.cog.abstraction.coaster.service.CoasterService"; + + private static Logger logger; private String serviceURL; private String listChecksum; @@ -47,6 +57,7 @@ this.registrationURL = registrationURL; this.serviceId = serviceId; list = new ArrayList(); + logger = new Logger(serviceId); } public void run() throws Exception { @@ -56,7 +67,7 @@ } private void getList() throws Exception { - System.out.println("Fetching file list"); + logger.log("Fetching file list"); StringBuffer line = new StringBuffer(); URL url = new URL(serviceURL + "/list?serviceId=" + serviceId); InputStream is = url.openStream(); @@ -90,14 +101,16 @@ } private void updateJars() throws Exception { - System.out.println("Updating jars"); - cacheDir.mkdirs(); + logger.log("Updating jars"); + if (!CACHE_DIR.mkdirs() && !CACHE_DIR.exists()) { + error("Could not create jar cache directory"); + } Iterator i = list.iterator(); while (i.hasNext()) { String[] jar = (String[]) i.next(); - File f = new File(cacheDir, buildName(jar)); + File f = new File(CACHE_DIR, buildName(jar)); if (!f.exists()) { - download(cacheDir, jar[0], jar[1]); + download(CACHE_DIR, jar[0], jar[1]); } } } @@ -105,9 +118,10 @@ private void download(File dir, String name, String checksum) throws Exception { try { - System.out.println("Downloading " + name); + logger.log("Downloading " + name); File dest = File.createTempFile("download-", ".jar", dir); - URL url = new URL(serviceURL + "/" + name + "?serviceId=" + serviceId); + URL url = new URL(serviceURL + "/" + name + "?serviceId=" + + serviceId); InputStream is = url.openStream(); FileOutputStream fos = new FileOutputStream(dest); byte[] buf = new byte[16384]; @@ -138,7 +152,7 @@ clStart(); } } - + private void arrangeJars() { String[] coasterJar = null; Iterator i = list.iterator(); @@ -150,18 +164,18 @@ } } if (coasterJar != null) { - System.out.println("Moved coaster jar to head of classpath"); + logger.log("Moved coaster jar to head of classpath"); list.add(0, coasterJar); } } private void forkStart() throws Exception { - System.out.println("Forking service"); + logger.log("Forking service"); StringBuffer sb = new StringBuffer(); arrangeJars(); Iterator i = list.iterator(); while (i.hasNext()) { - sb.append(cacheDir.getAbsolutePath()); + sb.append(CACHE_DIR.getAbsolutePath()); sb.append('/'); sb.append(buildName((String[]) i.next())); if (i.hasNext()) { @@ -179,12 +193,13 @@ args.add(SERVICE_CLASS); args.add(registrationURL); args.add(serviceId); - System.out.println("Args: " + args); - Process p = Runtime.getRuntime().exec((String[]) args.toArray(new String[0])); + logger.log("Args: " + args); + Process p = Runtime.getRuntime().exec( + (String[]) args.toArray(new String[0])); StringBuffer out = new StringBuffer(), err = new StringBuffer(); - System.out.println("Starting stdout consumer"); + logger.log("Starting stdout consumer"); consumeOutput(p.getInputStream(), out); - System.out.println("Starting stderr consumer"); + logger.log("Starting stderr consumer"); consumeOutput(p.getErrorStream(), err); int ec; while (true) { @@ -196,19 +211,20 @@ Thread.sleep(250); } } - System.out.println("Exit code: " + ec); + logger.log("Exit code: " + ec); System.out.println(out.toString()); System.err.println(err.toString()); if (ec != 0) { System.exit(ec); } } - + private void addDebuggingOptions(List args) { //args.add("-Xdebug"); //args.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=y"); + //args.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n"); } - + private void addProperties(List args) { addProperty(args, "X509_USER_PROXY"); addProperty(args, "GLOBUS_HOSTNAME"); @@ -216,7 +232,7 @@ addProperty(args, "X509_CERT_DIR"); args.add("-Djava.security.egd=file:///dev/urandom"); } - + private void addProperty(List args, String name) { String value = System.getProperty(name); if (value != null && !value.equals("")) { @@ -249,7 +265,7 @@ private void clStart() throws Exception { URL[] urls = new URL[list.size()]; for (int i = 0; i < list.size(); i++) { - urls[i] = new URL("file://" + cacheDir.getAbsolutePath() + "/" + urls[i] = new URL("file://" + CACHE_DIR.getAbsolutePath() + "/" + buildName((String[]) list.get(i))); System.err.println(urls[i]); } @@ -258,7 +274,8 @@ Class cls = cl.loadClass(SERVICE_CLASS); Method m = cls.getMethod("main", new Class[] { String[].class }); - m.invoke(null, new Object[] { new String[] { registrationURL, serviceId } }); + m.invoke(null, new Object[] { new String[] { registrationURL, + serviceId } }); } private String buildName(String[] n) { @@ -276,7 +293,6 @@ if (args.length != 4) { error("Wrong number of arguments. Expected <serviceURL>, <package list checksum>, <registration service URL>, and <id>"); } - System.out.println("Starting service..."); try { Bootstrap b = new Bootstrap(args[0], args[1], args[2], args[3]); b.run(); @@ -287,7 +303,50 @@ } private static void error(String message) { + if (logger != null) { + logger.log(message); + } System.err.println(message); System.exit(1); } + + private static class Logger { + private PrintStream ps; + private String id; + private static final DateFormat DF = new SimpleDateFormat( + "yyyy-MM-dd HH:mm:ss,SSS"); + + public Logger(String id) { + this.id = " " + id + " "; + if (!LOG_DIR.mkdirs() && !LOG_DIR.exists()) { + error("Cannot create coaster directory (" + LOG_DIR + ")"); + } + try { + ps = new PrintStream(new FileOutputStream(LOG_DIR + .getAbsolutePath() + + File.separator + "coasters.log", true)); + } + catch (IOException e) { + error("Cannot create coaster log file: " + e.getMessage()); + } + } + + public void log(String message) { + header(); + ps.println(message); + ps.flush(); + } + + public void log(String message, Throwable t) { + header(); + ps.println(message); + t.printStackTrace(ps); + ps.flush(); + } + + private void header() { + ps.print(DF.format(new Date())); + ps.print(id); + } + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:35:56
|
Revision: 2158 http://cogkit.svn.sourceforge.net/cogkit/?rev=2158&view=rev Author: hategan Date: 2008-09-16 17:35:52 +0000 (Tue, 16 Sep 2008) Log Message: ----------- remove coaster channel manager Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java Removed Paths: ------------- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/CoasterChannelManager.java Deleted: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/CoasterChannelManager.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/CoasterChannelManager.java 2008-09-16 17:34:11 UTC (rev 2157) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/CoasterChannelManager.java 2008-09-16 17:35:52 UTC (rev 2158) @@ -1,29 +0,0 @@ -//---------------------------------------------------------------------- -//This code is developed as part of the Java CoG Kit project -//The terms of the license can be found at http://www.cogkit.org/license -//This message may not be removed or altered. -//---------------------------------------------------------------------- - -/* - * Created on Feb 13, 2008 - */ -package org.globus.cog.abstraction.impl.execution.coaster; - -import org.globus.cog.abstraction.coaster.service.local.LocalRequestManager; -import org.globus.cog.karajan.workflow.service.channels.ChannelManager; - -public class CoasterChannelManager extends ChannelManager { - private static ChannelManager manager; - - public synchronized static ChannelManager getManager() { - if (manager == null) { - manager = new CoasterChannelManager(); - } - return manager; - } - - public CoasterChannelManager() { - super(); - setClientRequestManager(new LocalRequestManager()); - } -} Modified: trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java =================================================================== --- trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-09-16 17:34:11 UTC (rev 2157) +++ trunk/current/src/cog/modules/provider-coaster/src/org/globus/cog/abstraction/impl/execution/coaster/JobSubmissionTaskHandler.java 2008-09-16 17:35:52 UTC (rev 2158) @@ -10,6 +10,7 @@ import java.util.Map; import org.apache.log4j.Logger; +import org.globus.cog.abstraction.coaster.service.local.LocalRequestManager; import org.globus.cog.abstraction.impl.common.AbstractionFactory; import org.globus.cog.abstraction.impl.common.ProviderMethodException; import org.globus.cog.abstraction.impl.common.StatusImpl; @@ -31,6 +32,7 @@ import org.globus.cog.abstraction.interfaces.Status; import org.globus.cog.abstraction.interfaces.Task; import org.globus.cog.abstraction.interfaces.TaskHandler; +import org.globus.cog.karajan.workflow.service.channels.ChannelManager; import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; import org.globus.cog.karajan.workflow.service.commands.Command; import org.globus.cog.karajan.workflow.service.commands.Command.Callback; @@ -72,8 +74,8 @@ else { url = task.getService(0).getServiceContact().getContact(); } - KarajanChannel channel = CoasterChannelManager.getManager() - .reserveChannel(url, cred); + KarajanChannel channel = ChannelManager.getManager() + .reserveChannel(url, cred, LocalRequestManager.INSTANCE); jsc = new SubmitJobCommand(task); jsc.executeAsync(channel, this); } @@ -104,8 +106,10 @@ } String[] jmp = jm.split(":"); if (jmp.length < 2) { - throw new InvalidServiceContactException("Invalid job manager: " - + jm + ". Use <provider>:<remote-provider>[:<remote-job-manager>]."); + throw new InvalidServiceContactException( + "Invalid job manager: " + + jm + + ". Use <provider>:<remote-provider>[:<remote-job-manager>]."); } return jmp[0]; } @@ -119,7 +123,8 @@ return ((ExecutionService) s).getJobManager(); } else { - throw new IllegalSpecException("Service must be an ExecutionService"); + throw new IllegalSpecException( + "Service must be an ExecutionService"); } } @@ -135,8 +140,8 @@ TaskSubmissionException { try { if (jobid != null) { - KarajanChannel channel = CoasterChannelManager.getManager() - .reserveChannel(url, cred); + KarajanChannel channel = ChannelManager.getManager() + .reserveChannel(url, cred, LocalRequestManager.INSTANCE); CancelJobCommand cc = new CancelJobCommand(jobid); cc.execute(channel); cancel = false; @@ -211,22 +216,25 @@ t.setType(Task.JOB_SUBMISSION); JobSpecification js = new JobSpecificationImpl(); js.setExecutable("/bin/echo"); - //js.addArgument("0"); + // js.addArgument("0"); t.setSpecification(js); ExecutionService s = new ExecutionServiceImpl(); // s.setServiceContact(new ServiceContactImpl("localhost")); - //s.setServiceContact(new ServiceContactImpl("tp-grid1.ci.uchicago.edu")); - s.setServiceContact(new ServiceContactImpl("tg-grid1.uc.teragrid.org")); + // s.setServiceContact(new + // ServiceContactImpl("tp-grid1.ci.uchicago.edu")); + s + .setServiceContact(new ServiceContactImpl( + "tg-grid1.uc.teragrid.org")); // s.setServiceContact(new ServiceContactImpl("localhost:50013")); s.setProvider("coaster"); - //s.setJobManager("local:local"); + // s.setJobManager("local:local"); s.setJobManager("gt2:pbs"); s.setSecurityContext(new SecurityContextImpl()); t.setService(0, s); // JobSubmissionTaskHandler th = new JobSubmissionTaskHandler( // AbstractionFactory.newExecutionTaskHandler("local")); JobSubmissionTaskHandler th = new JobSubmissionTaskHandler(); - //th.setAutostart(true); + // th.setAutostart(true); th.submit(t); return t; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ha...@us...> - 2008-09-16 10:34:14
|
Revision: 2157 http://cogkit.svn.sourceforge.net/cogkit/?rev=2157&view=rev Author: hategan Date: 2008-09-16 17:34:11 +0000 (Tue, 16 Sep 2008) Log Message: ----------- updated service log4j.properties Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties Modified: trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties =================================================================== --- trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties 2008-09-16 17:33:14 UTC (rev 2156) +++ trunk/current/src/cog/modules/provider-coaster/resources/log4j.properties 2008-09-16 17:34:11 UTC (rev 2157) @@ -1,19 +1,25 @@ # Set root category priority to WARN and its only appender to CONSOLE. -log4j.rootCategory=WARN, CONSOLE +log4j.rootCategory=WARN, FILE # A1 is set to be a ConsoleAppender. #log4j.appender.DEBUG=org.apache.log4j.ConsoleAppender +log4j.appender.FILE=org.apache.log4j.FileAppender +log4j.appender.FILE.File=${user.home}/.globus/coasters/coasters.log +log4j.appender.FILE.layout=org.apache.log4j.PatternLayout +log4j.appender.FILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSSZZZZZ} %-5p %c{1} %m%n + + # CONSOLE is set to be a ConsoleAppender using a PatternLayout. -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +#log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +#log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout #log4j.appender.CONSOLE.Threshold=WARN -log4j.appender.CONSOLE.layout.ConversionPattern=[%C{1}] %-5p %t %x - %m%n +#log4j.appender.CONSOLE.layout.ConversionPattern=%-5p %x - %m%n #log4j.appender.CONSOLE.layout.ConversionPattern=%-5p [%C{1}] %x - %m%n log4j.logger.org.globus.cog.abstraction=WARN log4j.logger.org.apache.axis.utils.JavaUtils=ERROR log4j.logger.org.globus.cog.abstraction.impl.execution.coaster.JobSubmissionTaskHandler=INFO -log4j.logger.org.globus.cog.karajan.workflow.service=DEBUG +log4j.logger.org.globus.cog.karajan.workflow.service=INFO log4j.logger.org.globus.cog.abstraction.coaster=INFO \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |