|
From: <ha...@us...> - 2008-09-16 10:54:28
|
Revision: 2170
http://cogkit.svn.sourceforge.net/cogkit/?rev=2170&view=rev
Author: hategan
Date: 2008-09-16 17:54:08 +0000 (Tue, 16 Sep 2008)
Log Message:
-----------
better handling of channel errors
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-09-16 17:51:02 UTC (rev 2169)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
@@ -21,9 +21,12 @@
import java.util.Set;
import org.apache.log4j.Logger;
+import org.globus.cog.karajan.workflow.service.RemoteConfiguration;
import org.globus.cog.karajan.workflow.service.RequestManager;
+import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand;
-public abstract class AbstractStreamKarajanChannel extends AbstractKarajanChannel {
+public abstract class AbstractStreamKarajanChannel extends AbstractKarajanChannel implements
+ Purgeable {
public static final Logger logger = Logger.getLogger(AbstractStreamKarajanChannel.class);
public static final int STATE_IDLE = 0;
@@ -40,8 +43,8 @@
private int state, tag, flags, len;
protected AbstractStreamKarajanChannel(RequestManager requestManager,
- ChannelContext channelContext) {
- super(requestManager, channelContext);
+ ChannelContext channelContext, boolean client) {
+ super(requestManager, channelContext, client);
rhdr = new byte[HEADER_LEN];
}
@@ -69,6 +72,33 @@
this.contact = contact;
}
+ protected abstract void reconnect() throws ChannelException;
+
+ protected synchronized void handleChannelException(Exception e) {
+ logger.info("Channel config: " + getChannelContext().getConfiguration());
+ ChannelManager.getManager().handleChannelException(this, e);
+ try {
+ getSender().purge(this, new NullChannel(true));
+ }
+ catch (IOException e1) {
+ logger.warn("Failed to purge queued messages", e1);
+ }
+ }
+
+ protected void configure() throws Exception {
+ URI callbackURI = null;
+ ChannelContext sc = getChannelContext();
+ if (sc.getConfiguration().hasOption(RemoteConfiguration.CALLBACK)) {
+ callbackURI = getCallbackURI();
+ }
+ String remoteID = sc.getChannelID().getRemoteID();
+
+ ChannelConfigurationCommand ccc = new ChannelConfigurationCommand(sc.getConfiguration(),
+ callbackURI);
+ ccc.execute(this);
+ logger.info("Channel configured");
+ }
+
public synchronized void sendTaggedData(int tag, int flags, byte[] data) {
getSender().enqueue(tag, flags, data, this);
}
@@ -113,31 +143,35 @@
return true;
}
+ public void purge(KarajanChannel channel) throws IOException {
+ getSender().purge(this, channel);
+ }
+
protected void register() {
getMultiplexer(FAST).register(this);
}
private static final int SENDER_COUNT = 1;
private static Sender[] sender;
- private static int crtSender;
+ private static int crtSender;
private static synchronized Sender getSender() {
- if (sender == null) {
- sender = new Sender[SENDER_COUNT];
- for (int i = 0; i < SENDER_COUNT; i++) {
- sender[i] = new Sender();
- sender[i].start();
- }
- }
- try {
- return sender[crtSender++];
- }
- finally {
- if (crtSender == SENDER_COUNT) {
- crtSender = 0;
- }
- }
- }
+ if (sender == null) {
+ sender = new Sender[SENDER_COUNT];
+ for (int i = 0; i < SENDER_COUNT; i++) {
+ sender[i] = new Sender();
+ sender[i].start();
+ }
+ }
+ try {
+ return sender[crtSender++];
+ }
+ finally {
+ if (crtSender == SENDER_COUNT) {
+ crtSender = 0;
+ }
+ }
+ }
private static class SendEntry {
public final int tag, flags;
@@ -163,14 +197,15 @@
shdr = new byte[HEADER_LEN];
}
- public synchronized void enqueue(int tag, int flags, byte[] data, AbstractStreamKarajanChannel channel) {
+ public synchronized void enqueue(int tag, int flags, byte[] data,
+ AbstractStreamKarajanChannel channel) {
queue.addLast(new SendEntry(tag, flags, data, channel));
notify();
}
public void run() {
try {
- SendEntry e;
+ SendEntry e;
while (true) {
synchronized (this) {
while (queue.isEmpty()) {
@@ -181,14 +216,22 @@
try {
send(e.tag, e.flags, e.data, e.channel.getOutputStream());
}
+ catch (IOException ex) {
+ logger.info("Channel IOException", ex);
+ synchronized (this) {
+ queue.addFirst(e);
+ }
+ e.channel.handleChannelException(ex);
+ }
catch (Exception ex) {
- ex.printStackTrace();
- try {
- e.channel.getChannelContext().getRegisteredCommand(e.tag).errorReceived(null, ex);
- }
- catch (Exception exx) {
- logger.warn(exx);
- }
+ ex.printStackTrace();
+ try {
+ e.channel.getChannelContext().getRegisteredCommand(e.tag).errorReceived(
+ null, ex);
+ }
+ catch (Exception exx) {
+ logger.warn(exx);
+ }
}
}
}
@@ -197,11 +240,25 @@
}
}
+ public void purge(KarajanChannel source, KarajanChannel channel) throws IOException {
+ SendEntry e;
+ synchronized (this) {
+ Iterator i = queue.iterator();
+ while (i.hasNext()) {
+ e = (SendEntry) i.next();
+ if (e.channel == source) {
+ channel.sendTaggedData(e.tag, e.flags, e.data);
+ i.remove();
+ }
+ }
+ }
+ }
+
private void send(int tag, int flags, byte[] data, OutputStream os) throws IOException {
pack(shdr, 0, tag);
pack(shdr, 4, flags);
pack(shdr, 8, data.length);
- synchronized(os) {
+ synchronized (os) {
os.write(shdr);
os.write(data);
os.flush();
@@ -249,6 +306,9 @@
Iterator i = channels.iterator();
while (i.hasNext()) {
AbstractStreamKarajanChannel channel = (AbstractStreamKarajanChannel) i.next();
+ if (channel.isClosed()) {
+ i.remove();
+ }
try {
any |= channel.step();
}
@@ -265,6 +325,8 @@
while (i.hasNext()) {
channels.add(i.next());
}
+ remove.clear();
+ add.clear();
}
if (!any) {
Thread.sleep(20);
@@ -280,7 +342,7 @@
if (logger.isDebugEnabled()) {
logger.debug("Channel exception caught", e);
}
- channel.shutdown();
+ channel.handleChannelException(e);
remove.add(channel);
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|