|
From: <ha...@us...> - 2008-09-16 10:48:52
|
Revision: 2167
http://cogkit.svn.sourceforge.net/cogkit/?rev=2167&view=rev
Author: hategan
Date: 2008-09-16 17:48:48 +0000 (Tue, 16 Sep 2008)
Log Message:
-----------
added heartbeat command/handler
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatCheckTask.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatTask.java
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatCheckTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatCheckTask.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatCheckTask.java 2008-09-16 17:48:48 UTC (rev 2167)
@@ -0,0 +1,41 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Sep 9, 2008
+ */
+package org.globus.cog.karajan.workflow.service.channels;
+
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+public class HeartBeatCheckTask extends TimerTask {
+ public static final Logger logger = Logger.getLogger(HeartBeatCheckTask.class);
+
+ private KarajanChannel channel;
+ private int multiplier, interval;
+
+ public HeartBeatCheckTask(KarajanChannel channel, int interval, int multiplier) {
+ this.channel = channel;
+ this.multiplier = multiplier;
+ this.interval = interval;
+ channel.getChannelContext().setLastHeartBeat(System.currentTimeMillis());
+ }
+
+ public void run() {
+ if (channel.isOffline()) {
+ this.cancel();
+ }
+ else if (channel.isStarted()) {
+ if (channel.getChannelContext().getLastHeartBeat() - interval * multiplier > System.currentTimeMillis()) {
+ logger.warn("Channel (" + channel + ") has not received any heartbeat in "
+ + multiplier + " intervals. Shutting it down.");
+ channel.shutdown();
+ }
+ }
+ }
+}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatTask.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatTask.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/HeartBeatTask.java 2008-09-16 17:48:48 UTC (rev 2167)
@@ -0,0 +1,54 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Sep 9, 2008
+ */
+package org.globus.cog.karajan.workflow.service.channels;
+
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.commands.Command;
+import org.globus.cog.karajan.workflow.service.commands.HeartBeatCommand;
+import org.globus.cog.karajan.workflow.service.commands.Command.Callback;
+
+public class HeartBeatTask extends TimerTask implements Callback {
+ public static final Logger logger = Logger.getLogger(HeartBeatTask.class);
+
+ private KarajanChannel channel;
+
+ public HeartBeatTask(KarajanChannel channel) {
+ this.channel = channel;
+ }
+
+ public void run() {
+ if (channel.isOffline()) {
+ this.cancel();
+ }
+ else if (channel.isStarted()) {
+ HeartBeatCommand hbc = new HeartBeatCommand();
+ try {
+ hbc.executeAsync(channel, this);
+ }
+ catch (ProtocolException e) {
+ this.cancel();
+ logger.error("Protocol error caught while trying to send heartbeat. " +
+ "Suspending heartbeats for this channel.", e);
+ }
+ }
+ }
+
+ public void errorReceived(Command cmd, String msg, Exception t) {
+ // if it's a channel error, the channel should have figured it out
+ logger.warn("Heartbeat failed: " + msg, t);
+ }
+
+ public void replyReceived(Command cmd) {
+ // we're good
+ }
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|