From: <ha...@us...> - 2008-10-10 23:21:07
|
Revision: 2229 http://cogkit.svn.sourceforge.net/cogkit/?rev=2229&view=rev Author: hategan Date: 2008-10-10 23:21:02 +0000 (Fri, 10 Oct 2008) Log Message: ----------- don't use async timeout if executing synchronously Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java 2008-10-10 23:18:30 UTC (rev 2228) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java 2008-10-10 23:21:02 UTC (rev 2229) @@ -32,7 +32,7 @@ timer = new Timer(); } - public static final int DEFAULT_REPLY_TIMEOUT = 10000 * 60; + public static final int DEFAULT_REPLY_TIMEOUT = 2 * 60 * 1000; //2 minutes public static final int DEFAULT_MAX_RETRIES = 2; private int replyTimeout = DEFAULT_REPLY_TIMEOUT; private int maxRetries = DEFAULT_MAX_RETRIES; @@ -57,11 +57,14 @@ } public void waitForReply() throws ReplyTimeoutException { - if (!this.isInDataReceived()) { - synchronized (this) { + synchronized (this) { + if (!this.isInDataReceived()) { long start = System.currentTimeMillis(); long left = replyTimeout; while (!this.isInDataReceived()) { + if (left <= 0) { + throw new ReplyTimeoutException(); + } try { wait(left); } @@ -69,9 +72,6 @@ e.printStackTrace(); } left = replyTimeout - (System.currentTimeMillis() - start); - if (left <= 0) { - throw new ReplyTimeoutException(); - } } } } @@ -104,27 +104,30 @@ if (logger.isInfoEnabled()) { logger.info(this + " CMD: " + this); } - channel.sendTaggedData(getId(), fin, getOutCmd().getBytes()); + int id = getId(); + if (id == NOID) { + logger.warn("Command has NOID: " + this, new Throwable()); + } + channel.sendTaggedData(id, fin, getOutCmd().getBytes()); if (!fin) { Iterator i = outData.iterator(); while (i.hasNext()) { byte[] buf = (byte[]) i.next(); - channel.sendTaggedData(getId(), !i.hasNext(), buf); + channel.sendTaggedData(id, !i.hasNext(), buf); } } - setupReplyTimeoutChecker(); } catch (ChannelIOException e) { reexecute(e.getMessage(), e); } } - + protected void setupReplyTimeoutChecker() { - timer.schedule(timeout = new Timeout(), replyTimeout); + timer.schedule(timeout = new Timeout(), replyTimeout); } public byte[] execute(KarajanChannel channel) throws ProtocolException, IOException { - executeAsync(channel); + send(channel, false); waitForReply(); if (errorMsg != null) { throw new ProtocolException(errorMsg, exception); @@ -142,8 +145,18 @@ } public void executeAsync(KarajanChannel channel) throws ProtocolException { + send(channel, true); + } + + protected void send(KarajanChannel channel, boolean async) throws ProtocolException { channel.registerCommand(this); + if (getId() == NOID) { + logger.warn("Registration failed for command " + this + " on channel " + channel); + } send(); + if (async) { + setupReplyTimeoutChecker(); + } } public int getReplyTimeout() { @@ -163,10 +176,7 @@ } public void receiveCompleted() { - if (timeout == null) { - return; - } - else { + if (timeout != null) { timeout.cancel(); timeout = null; } @@ -207,9 +217,11 @@ protected void reexecute(String message, Exception ex) { if (++retries > maxRetries) { + logger.info(this + ": failed too many times", ex); errorReceived(message, ex); } else { + logger.info(this + ": re-sending"); try { setChannel(ChannelManager.getManager().reserveChannel( getChannel().getChannelContext())); @@ -226,6 +238,7 @@ protected void handleReplyTimeout() { timeout = null; + logger.info(this + ": handling reply timeout"); reexecute("Reply timeout", new ReplyTimeoutException()); } @@ -236,7 +249,7 @@ } public String toString() { - return "Command(" + this.getOutCmd() + ")"; + return "Command(" + this.getId() + ", " + this.getOutCmd() + ")"; } public static interface Callback { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |