From: <ha...@us...> - 2008-09-16 10:51:22
|
Revision: 2169 http://cogkit.svn.sourceforge.net/cogkit/?rev=2169&view=rev Author: hategan Date: 2008-09-16 17:51:02 +0000 (Tue, 16 Sep 2008) Log Message: ----------- fixed command retries Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java 2008-09-16 17:49:40 UTC (rev 2168) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java 2008-09-16 17:51:02 UTC (rev 2169) @@ -20,20 +20,22 @@ import org.globus.cog.karajan.workflow.service.ReplyTimeoutException; import org.globus.cog.karajan.workflow.service.RequestReply; import org.globus.cog.karajan.workflow.service.channels.ChannelIOException; +import org.globus.cog.karajan.workflow.service.channels.ChannelManager; import org.globus.cog.karajan.workflow.service.channels.KarajanChannel; public abstract class Command extends RequestReply { private static final Logger logger = Logger.getLogger(Command.class); - + private static final Timer timer; - + static { timer = new Timer(); } - public static final int DEFAULT_REPLY_TIMEOUT = 1000 * 60; - public static final int MAX_RETRIES = 2; + public static final int DEFAULT_REPLY_TIMEOUT = 10000 * 60; + public static final int DEFAULT_MAX_RETRIES = 2; private int replyTimeout = DEFAULT_REPLY_TIMEOUT; + private int maxRetries = DEFAULT_MAX_RETRIES; private Callback cb; private String errorMsg; @@ -86,6 +88,7 @@ public void send() throws ProtocolException { KarajanChannel channel = getChannel(); + logger.info("Sending " + this + " on " + channel); List outData = getOutData(); if (channel == null) { throw new ProtocolException("Unregistered command"); @@ -96,6 +99,9 @@ logger.debug(ppOutData("CMD")); } try { + if (logger.isInfoEnabled()) { + logger.info(this + " CMD: " + this); + } channel.sendTaggedData(getId(), fin, getOutCmd().getBytes()); if (!fin) { Iterator i = outData.iterator(); @@ -107,7 +113,7 @@ timer.schedule(timeout = new Timeout(), replyTimeout); } catch (ChannelIOException e) { - reexecute(); + reexecute(e.getMessage(), e); } } @@ -142,6 +148,14 @@ this.replyTimeout = replyTimeout; } + public int getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + public void receiveCompleted() { if (timeout == null) { return; @@ -181,35 +195,44 @@ public void channelClosed() { if (!this.isInDataReceived()) { - reexecute(); + reexecute("Channel closed", null); } } - protected void reexecute() { - getChannel().getChannelContext().reexecute(this); - } - - protected void handleReplyTimeout() { - timeout = null; - if (++retries > MAX_RETRIES) { - errorReceived("Reply timeout", new ReplyTimeoutException()); + protected void reexecute(String message, Exception ex) { + if (++retries > maxRetries) { + errorReceived(message, ex); } else { try { + setChannel(ChannelManager.getManager().reserveChannel( + getChannel().getChannelContext())); send(); } catch (ProtocolException e) { errorReceived(e.getMessage(), e); } + catch (Exception e) { + reexecute(e.getMessage(), ex); + } } } - + + protected void handleReplyTimeout() { + timeout = null; + reexecute("Reply timeout", new ReplyTimeoutException()); + } + private class Timeout extends TimerTask { public void run() { handleReplyTimeout(); } } + public String toString() { + return "Command(" + this.getOutCmd() + ")"; + } + public static interface Callback { void replyReceived(Command cmd); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |