From: <ha...@us...> - 2008-10-05 19:08:42
|
Revision: 2221 http://cogkit.svn.sourceforge.net/cogkit/?rev=2221&view=rev Author: hategan Date: 2008-10-05 17:11:29 +0000 (Sun, 05 Oct 2008) Log Message: ----------- cleaned up channel startup; the muxer is a critical process - hard abort if any errors pop up Modified Paths: -------------- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-10-05 17:08:29 UTC (rev 2220) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-10-05 17:11:29 UTC (rev 2221) @@ -156,7 +156,6 @@ } - private static final int SENDER_COUNT = 1; private static Sender[] sender; private static int crtSender; @@ -283,7 +282,7 @@ if (multiplexer == null) { multiplexer = new Multiplexer[MUX_COUNT]; for (int i = 0; i < MUX_COUNT; i++) { - multiplexer[i] = new Multiplexer(); + multiplexer[i] = new Multiplexer(i); multiplexer[i].start(); } } @@ -291,11 +290,16 @@ } protected static class Multiplexer extends Thread { + public static final Logger logger = Logger.getLogger(Multiplexer.class); + private Set channels; private List remove, add; + private boolean terminated; + private int id; - public Multiplexer() { - super("Channel multiplexer"); + public Multiplexer(int id) { + super("Channel multiplexer " + id); + this.id = id; setDaemon(true); channels = new HashSet(); remove = new ArrayList(); @@ -304,9 +308,16 @@ public synchronized void register(AbstractStreamKarajanChannel channel) { add.add(channel); + if (logger.isInfoEnabled()) { + logger.info("(" + id + ") Scheduling " + channel + " for addition"); + } + if (terminated) { + logger.warn("Trying to add a channel to a stopped multiplexer"); + } } public void run() { + logger.info("Multiplexer " + id + " started"); boolean any; try { while (true) { @@ -321,17 +332,24 @@ any |= channel.step(); } catch (Exception e) { - shutdown(channel, e); + try { + shutdown(channel, e); + } + catch (Exception ee) { + logger.warn("Failed to shut down channel", e); + } } } synchronized (this) { i = remove.iterator(); while (i.hasNext()) { - channels.remove(i.next()); + Object r = i.next(); + channels.remove(r); } i = add.iterator(); while (i.hasNext()) { - channels.add(i.next()); + Object a = i.next(); + channels.add(a); } remove.clear(); add.clear(); @@ -344,6 +362,15 @@ catch (Exception e) { logger.warn("Exception in channel multiplexer", e); } + catch (Error e) { + logger.error("Error in multiplexer", e); + e.printStackTrace(); + System.exit(10); + } + finally { + logger.info("Multiplexer finished"); + terminated = true; + } } private void shutdown(AbstractStreamKarajanChannel channel, Exception e) { Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java 2008-10-05 17:08:29 UTC (rev 2220) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java 2008-10-05 17:11:29 UTC (rev 2221) @@ -11,14 +11,10 @@ import java.io.IOException; import java.net.Socket; -import java.net.URI; -import org.globus.cog.karajan.workflow.service.ProtocolException; -import org.globus.cog.karajan.workflow.service.RemoteConfiguration; import org.globus.cog.karajan.workflow.service.RequestManager; -import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand; -public abstract class AbstractTCPChannel extends AbstractStreamKarajanChannel implements Runnable { +public abstract class AbstractTCPChannel extends AbstractStreamKarajanChannel { private Socket socket; private boolean started; private Exception startException; @@ -42,18 +38,7 @@ else { setName("S(" + socket.getLocalAddress() + ")"); } - new Thread(this).start(); - while (!isStarted() && !isClosed() && startException == null) { - try { - wait(); - } - catch (InterruptedException e) { - } - } - if (startException != null) { - logger.debug("Exception while starting channel", startException); - throw new ChannelException(startException); - } + initialize(); logger.info(getContact() + "Channel started"); if (isClient()) { try { @@ -65,30 +50,16 @@ } } - public void run() { + private void initialize() throws ChannelException { ChannelContext context = getChannelContext(); try { - try { - started = true; - } - catch (Exception e) { - startException = e; - e.printStackTrace(); - return; - } - finally { - synchronized (this) { - notifyAll(); - } - } initializeConnection(); register(); + started = true; } catch (Exception e) { - if (!closing) { - logger.warn("Exception in channel", e); - context.notifyRegisteredListeners(e); - } + logger.debug("Exception while starting channel", e); + throw new ChannelException(e); } } @@ -96,7 +67,6 @@ } - public void shutdown() { if (isLocalShutdown()) { return; Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java =================================================================== --- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java 2008-10-05 17:08:29 UTC (rev 2220) +++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java 2008-10-05 17:11:29 UTC (rev 2221) @@ -30,7 +30,7 @@ import org.ietf.jgss.GSSCredential; import org.ietf.jgss.GSSManager; -public class GSSChannel extends AbstractTCPChannel implements Runnable { +public class GSSChannel extends AbstractTCPChannel { private static final Logger logger = Logger.getLogger(GSSChannel.class); private GssSocket socket; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |