|
From: <ha...@us...> - 2008-09-16 10:56:14
|
Revision: 2171
http://cogkit.svn.sourceforge.net/cogkit/?rev=2171&view=rev
Author: hategan
Date: 2008-09-16 17:56:08 +0000 (Tue, 16 Sep 2008)
Log Message:
-----------
...
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -15,17 +15,21 @@
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
+import java.util.TimerTask;
import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.NoSuchHandlerException;
import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.RemoteConfiguration;
import org.globus.cog.karajan.workflow.service.RequestManager;
import org.globus.cog.karajan.workflow.service.Service;
+import org.globus.cog.karajan.workflow.service.RemoteConfiguration.Entry;
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
public abstract class AbstractKarajanChannel implements KarajanChannel {
private static final Logger logger = Logger.getLogger(AbstractKarajanChannel.class);
+ public static final int DEFAULT_HEARTBEAT_INTERVAL = 5 * 60; //seconds
private ChannelContext context;
private volatile int usageCount, longTermUsageCount;
@@ -34,8 +38,10 @@
private boolean localShutdown, closed;
private String name;
private Service callbackService;
+ private final boolean client;
- protected AbstractKarajanChannel(RequestManager requestManager, ChannelContext channelContext) {
+ protected AbstractKarajanChannel(RequestManager requestManager, ChannelContext channelContext,
+ boolean client) {
if (channelContext != null) {
this.context = channelContext;
}
@@ -44,8 +50,47 @@
}
this.requestManager = requestManager;
registeredMaps = new LinkedList();
+ this.client = client;
+ configureHeartBeat();
}
+ protected void configureHeartBeat() {
+ TimerTask heartBeatTask;
+ Entry config = context.getConfiguration();
+ int heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
+ if (config != null && config.hasOption(RemoteConfiguration.HEARTBEAT)) {
+ if (config.hasArg(RemoteConfiguration.HEARTBEAT)) {
+ heartBeatInterval = Integer.parseInt(config.getArg(RemoteConfiguration.HEARTBEAT));
+ }
+ heartBeatInterval *= 1000;
+ }
+ if (!isOffline() && isClient()) {
+ heartBeatTask = new HeartBeatTask(this);
+ context.getTimer().schedule(heartBeatTask, heartBeatInterval, heartBeatInterval);
+ }
+ else {
+ if (logger.isInfoEnabled()) {
+ if (config == null) {
+ logger.info(this + ": Disabling heartbeats (config is null)");
+ }
+ else if (!config.hasOption(RemoteConfiguration.HEARTBEAT)) {
+ logger.info(this + ": Disabling heartbeats (disabled in config)");
+ }
+ else if (isOffline()) {
+ logger.info(this + ": Disabling heartbeats (offline channel)");
+ }
+ else if (!isClient()) {
+ logger.info(this + ": Disabling heartbeats (not a client)");
+ }
+ }
+ }
+ if (!isOffline() && !isClient()) {
+ int mult = 2;
+ heartBeatTask = new HeartBeatCheckTask(this, heartBeatInterval, mult);
+ context.getTimer().schedule(heartBeatTask, mult * heartBeatInterval, mult * heartBeatInterval);
+ }
+ }
+
public void registerCommand(Command cmd) throws ProtocolException {
context.registerCommand(cmd);
cmd.register(this);
@@ -106,7 +151,7 @@
}
return crt + count;
}
-
+
public static void pack(byte[] buf, int offset, int value) {
buf[offset] = (byte) (value & 0xff);
buf[offset + 1] = (byte) ((value >> 8) & 0xff);
@@ -141,7 +186,7 @@
public RequestManager getRequestManager() {
return requestManager;
}
-
+
public void setRequestManager(RequestManager rm) {
if (rm == null) {
throw new IllegalArgumentException("The request manager cannot be null");
@@ -189,7 +234,7 @@
}
public boolean isClient() {
- return false;
+ return client;
}
public String getName() {
@@ -223,14 +268,17 @@
protected void handleReply(int tag, boolean fin, boolean error, int len, byte[] data) {
if (logger.isDebugEnabled()) {
- logger.debug(this + " REPL<: tag = " + tag + ", fin = " + fin
- + ", err = " + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
+ logger.debug(this + " REPL<: tag = " + tag + ", fin = " + fin + ", err = " + error
+ + ", datalen = " + len + ", data = " + ppByteBuf(data));
}
Command cmd = getChannelContext().getRegisteredCommand(tag);
if (cmd != null) {
try {
cmd.replyReceived(data);
if (fin) {
+ if (logger.isInfoEnabled()) {
+ logger.info(this + " REPL: " + cmd);
+ }
if (error) {
cmd.errorReceived();
}
@@ -249,15 +297,15 @@
unregisteredSender(tag);
}
}
-
+
protected void unregisteredSender(int tag) {
logger.warn(getName() + " Recieved reply to unregistered sender. Tag: " + tag);
}
protected void handleRequest(int tag, boolean fin, boolean error, int len, byte[] data) {
if (logger.isDebugEnabled()) {
- logger.debug(this + " REQ<: tag = " + tag + ", fin = " + fin
- + ", err = " + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
+ logger.debug(this + " REQ<: tag = " + tag + ", fin = " + fin + ", err = " + error
+ + ", datalen = " + len + ", data = " + ppByteBuf(data));
}
RequestHandler handler = getChannelContext().getRegisteredHandler(tag);
try {
@@ -278,8 +326,11 @@
}
if (fin) {
try {
+ if (logger.isInfoEnabled()) {
+ logger.info(this + " REQ: " + handler);
+ }
if (error) {
- // TODO
+ handler.errorReceived();
}
else {
handler.receiveCompleted();
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -9,29 +9,34 @@
*/
package org.globus.cog.karajan.workflow.service.channels;
+import java.io.IOException;
import java.net.Socket;
+import java.net.URI;
+import org.globus.cog.karajan.workflow.service.ProtocolException;
+import org.globus.cog.karajan.workflow.service.RemoteConfiguration;
import org.globus.cog.karajan.workflow.service.RequestManager;
+import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand;
public abstract class AbstractTCPChannel extends AbstractStreamKarajanChannel implements Runnable {
private Socket socket;
private boolean started;
private Exception startException;
- private final boolean client;
private boolean closing;
public AbstractTCPChannel(RequestManager requestManager, ChannelContext channelContext,
boolean client) {
- super(requestManager, channelContext);
- this.client = client;
+ super(requestManager, channelContext, client);
}
- protected void setSocket(Socket socket) {
+ protected void setSocket(Socket socket) throws IOException {
this.socket = socket;
+ setInputStream(socket.getInputStream());
+ setOutputStream(socket.getOutputStream());
}
public synchronized void start() throws ChannelException {
- if (client) {
+ if (isClient()) {
setName("C(" + socket.getLocalAddress() + ")");
}
else {
@@ -50,14 +55,20 @@
throw new ChannelException(startException);
}
logger.info(getContact() + "Channel started");
+ if (isClient()) {
+ try {
+ configure();
+ }
+ catch (Exception e) {
+ throw new ChannelException("Failed to configure channel", e);
+ }
+ }
}
public void run() {
ChannelContext context = getChannelContext();
try {
try {
- setInputStream(socket.getInputStream());
- setOutputStream(socket.getOutputStream());
started = true;
}
catch (Exception e) {
@@ -85,7 +96,11 @@
}
+
public void shutdown() {
+ if (isLocalShutdown()) {
+ return;
+ }
try {
setLocalShutdown();
ChannelManager.getManager().shutdownChannel(this);
@@ -94,13 +109,13 @@
logger.debug("Channel already shutting down");
}
catch (Exception e) {
- logger.warn(getContact() + "Could not shutdown channel", e);
+ logger.warn(getContact() + ": Could not shutdown channel", e);
}
super.close();
synchronized (this) {
notify();
}
- logger.info(getContact() + "Channel terminated");
+ logger.info(getContact() + ": Channel terminated");
}
public void close() {
@@ -108,19 +123,15 @@
try {
if (!socket.isClosed()) {
socket.close();
- logger.info(getContact() + "Channel shut down");
+ logger.info(getContact() + ": Channel shut down");
}
}
catch (Exception e) {
- logger.warn(getContact() + "Failed to close socket", e);
+ logger.warn(getContact() + ": Failed to close socket", e);
}
super.close();
}
- public boolean isClient() {
- return client;
- }
-
public boolean isStarted() {
return started;
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -25,7 +25,7 @@
private List buffer;
public BufferingChannel(ChannelContext channelContext) {
- super(null, channelContext);
+ super(null, channelContext, false);
buffer = new ArrayList();
}
@@ -70,9 +70,13 @@
}
public boolean isOffline() {
- return false;
+ return true;
}
+ public boolean isStarted() {
+ return true;
+ }
+
public String toString() {
return "BufferingChannel";
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -45,6 +45,8 @@
private ServiceContext serviceContext;
private GSSCredential cred;
private ChannelAttributes attr;
+ private int reconnectionAttempts;
+ private long lastHeartBeat;
public ChannelContext() {
this(new ServiceContext(null));
@@ -219,4 +221,24 @@
data.put(name, o);
}
}
+
+ public int getReconnectionAttempts() {
+ return reconnectionAttempts;
+ }
+
+ public void setReconnectionAttempts(int reconnectionAttempts) {
+ this.reconnectionAttempts = reconnectionAttempts;
+ }
+
+ public long getLastHeartBeat() {
+ return lastHeartBeat;
+ }
+
+ public void setLastHeartBeat(long lastHeartBeat) {
+ this.lastHeartBeat = lastHeartBeat;
+ }
+
+ public String toString() {
+ return data.toString();
+ }
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -19,6 +19,7 @@
import org.globus.cog.karajan.workflow.service.RemoteConfiguration;
import org.globus.cog.karajan.workflow.service.RequestManager;
import org.globus.cog.karajan.workflow.service.Service;
+import org.globus.cog.karajan.workflow.service.UserContext;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
@@ -38,7 +39,7 @@
return manager;
}
- public ChannelManager() {
+ private ChannelManager() {
channels = new HashMap();
hosts = new HashMap();
rchannels = new HashMap();
@@ -75,15 +76,11 @@
HostCredentialPair hcp = new HostCredentialPair(host, cred);
channel = (MetaChannel) channels.get(hcp);
if (channel == null) {
- channel = new MetaChannel(rm == null ? clientRequestManager : rm,
- new ChannelContext());
- new Throwable().printStackTrace();
- System.err.println("Creating new meta channel with rm: "
- + channel.getRequestManager());
- channel.getChannelContext().setConfiguration(
- RemoteConfiguration.getDefault().find(host));
- channel.getChannelContext().setRemoteContact(host);
- channel.getChannelContext().setCredential(cred);
+ ChannelContext context = new ChannelContext();
+ context.setConfiguration(RemoteConfiguration.getDefault().find(host));
+ context.setRemoteContact(host);
+ context.setCredential(cred);
+ channel = new MetaChannel(rm == null ? clientRequestManager : rm, context);
registerChannel(hcp, channel);
}
}
@@ -115,10 +112,13 @@
public void registerChannel(String url, GSSCredential cred, KarajanChannel channel)
throws ChannelException {
synchronized (channels) {
- MetaChannel previous = new MetaChannel(channel.getRequestManager(),
- channel.getChannelContext());
+ HostCredentialPair hcp = new HostCredentialPair(url, cred);
+ MetaChannel previous = getMetaChannel(channel);
+ if (previous == null) {
+ previous = new MetaChannel(channel.getRequestManager(), channel.getChannelContext());
+ }
+ channels.put(hcp, previous);
previous.bind(channel);
- channels.put(new HostCredentialPair(url, cred), previous);
}
}
@@ -129,20 +129,18 @@
synchronized (channels) {
MetaChannel previous = (MetaChannel) channels.get(id);
if (previous != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("Re-registering " + id + " = " + channel);
+ if (logger.isInfoEnabled()) {
+ logger.info("Re-registering " + id + " = " + channel);
}
try {
/*
* Check to see if a rogue user is not trying to "steal" a
* channel
*/
- if (!previous.getChannelContext().getUserContext().getName().equals(
- channel.getChannelContext().getUserContext().getName())) {
+
+ if (!equals(getName(previous), getName(channel))) {
throw new ChannelException("Channel registration denied. Expected name: "
- + previous.getChannelContext().getUserContext().getName()
- + "; actual name: "
- + channel.getChannelContext().getUserContext().getName());
+ + getName(previous) + "; actual name: " + getName(channel));
}
}
catch (Exception e) {
@@ -173,6 +171,15 @@
}
}
+ private boolean equals(Object o1, Object o2) {
+ return o1 == null ? o2 == null : o1.equals(o2);
+ }
+
+ private String getName(KarajanChannel channel) {
+ UserContext uc = channel.getChannelContext().getUserContext();
+ return uc == null ? null : uc.getName();
+ }
+
public KarajanChannel reserveChannel(String host, GSSCredential cred, RequestManager rm)
throws ChannelException {
MetaChannel channel = getClientChannel(host, cred, rm);
@@ -187,41 +194,111 @@
}
public KarajanChannel reserveChannel(KarajanChannel channel) throws ChannelException {
- return reserveChannel(getChannel(channel));
+ return reserveChannel(getMetaChannel(channel));
}
public KarajanChannel reserveChannel(ChannelContext context) throws ChannelException {
- return reserveChannel(getChannel(context));
+ return reserveChannel(getMetaChannel(context));
}
+ private void connect(MetaChannel meta) throws ChannelException {
+ try {
+ String contact = meta.getChannelContext().getRemoteContact();
+ if (contact == null) {
+ // Should buffer things for a certain period of time
+ throw new ChannelException("Channel died and no contact available");
+ }
+ Client client = Client.newClient(contact, meta.getChannelContext(),
+ clientRequestManager);
+ channels.put(client.getChannel().getChannelContext().getChannelID(), meta);
+ meta.bind(client.getChannel());
+
+ }
+ catch (ChannelException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new ChannelException(e);
+ }
+ }
+
public KarajanChannel reserveChannel(MetaChannel meta) throws ChannelException {
synchronized (meta) {
meta.incUsageCount();
RemoteConfiguration.Entry config = meta.getChannelContext().getConfiguration();
if (meta.isOffline()) {
- try {
- String contact = meta.getChannelContext().getRemoteContact();
- if (contact == null) {
- // Should buffer things for a certain period of time
- throw new ChannelException("Channel died and no contact available");
- }
- Client client = Client.newClient(contact, meta.getChannelContext(),
- clientRequestManager);
- channels.put(client.getChannel().getChannelContext().getChannelID(), meta);
- meta.bind(client.getChannel());
+ connect(meta);
+ }
+ }
+ return meta;
+ }
+ public void handleChannelException(KarajanChannel channel, Exception e) {
+ logger.info("Handling channel exception");
+ if (channel.isOffline()) {
+ logger.info("Channel already shut down");
+ return;
+ }
+ channel.setLocalShutdown();
+ ChannelContext ctx = channel.getChannelContext();
+ RemoteConfiguration.Entry config = ctx.getConfiguration();
+ try {
+ if (config != null) {
+ if (config.hasOption(RemoteConfiguration.RECONNECT)) {
+ buffer(channel);
+ channel.close();
+ asyncReconnect(channel, e);
}
- catch (ChannelException e) {
- throw e;
+ else {
+ shutdownChannel(channel);
}
- catch (Exception e) {
- throw new ChannelException(e);
- }
}
+ else {
+ shutdownChannel(channel);
+ }
}
- return meta;
+ catch (Exception e2) {
+ logger.warn("Failed to shut down channel", e2);
+ }
+ logger.info("Channel exception handled");
}
+ private void asyncReconnect(final KarajanChannel channel, final Exception e) {
+ final ChannelContext ctx = channel.getChannelContext();
+ final RemoteConfiguration.Entry config = ctx.getConfiguration();
+ Thread t = new Thread() {
+ public void run() {
+ Exception ex = e;
+ int limit = Integer.MAX_VALUE;
+ if (config.hasArg(RemoteConfiguration.RECONNECT)) {
+ limit = Integer.parseInt(config.getArg(RemoteConfiguration.RECONNECT));
+ }
+ while (ctx.getReconnectionAttempts() < limit) {
+
+ try {
+ connect(getMetaChannel(channel));
+ ctx.setReconnectionAttempts(0);
+ return;
+ }
+ catch (ChannelException e2) {
+ ctx.setReconnectionAttempts(ctx.getReconnectionAttempts() + 1);
+ ex = e2;
+ }
+ try {
+ Thread.sleep((long) (1000 * Math.pow(2, ctx.getReconnectionAttempts())));
+ }
+ catch (InterruptedException e2) {
+ channel.getChannelContext().getService().irrecoverableChannelError(channel,
+ e2);
+ }
+ }
+ channel.getChannelContext().getService().irrecoverableChannelError(channel, ex);
+ }
+ };
+ t.setName("Reconnector");
+ t.start();
+ }
+
public void releaseChannel(KarajanChannel channel) {
if (channel == null) {
return;
@@ -238,18 +315,13 @@
}
}
else {
- try {
- unregisterChannel((MetaChannel) channel);
- }
- catch (ChannelException e) {
- logger.warn("Exception caught while unregistering channel", e);
- }
+ unregisterChannel((MetaChannel) channel);
}
}
}
}
- private void registerChannel(Object key, KarajanChannel channel) {
+ private void registerChannel(Object key, MetaChannel channel) {
synchronized (channels) {
channels.put(key, channel);
List l = (List) rchannels.get(channel);
@@ -261,41 +333,55 @@
}
}
- protected void unregisterChannel(MetaChannel channel) throws ChannelException {
- synchronized (channel) {
- RemoteConfiguration.Entry config = channel.getChannelContext().getConfiguration();
- if (config.hasOption(RemoteConfiguration.BUFFER)) {
- channel.bind(new BufferingChannel(channel.getChannelContext()));
- }
- else if (config.hasOption(RemoteConfiguration.POLL)) {
- if (config.hasArg(RemoteConfiguration.POLL)) {
- String time = config.getArg(RemoteConfiguration.POLL);
- int itime = Integer.parseInt(time);
- channel.poll(itime);
+ public void unregisterChannel(KarajanChannel channel) throws ChannelException {
+ unregisterChannel(getMetaChannel(channel));
+ }
+
+ protected void unregisterChannel(MetaChannel channel) {
+ try {
+ synchronized (channel) {
+ RemoteConfiguration.Entry config = channel.getChannelContext().getConfiguration();
+ if (config != null) {
+ if (config.hasOption(RemoteConfiguration.BUFFER)) {
+ channel.bind(new BufferingChannel(channel.getChannelContext()));
+ }
+ else if (config.hasOption(RemoteConfiguration.POLL)) {
+ if (config.hasArg(RemoteConfiguration.POLL)) {
+ String time = config.getArg(RemoteConfiguration.POLL);
+ int itime = Integer.parseInt(time);
+ channel.poll(itime);
+ }
+ else {
+ channel.poll(300);
+ }
+ channel.bind(new NullChannel());
+ }
+ else {
+ channel.bind(new NullChannel());
+ }
}
else {
- channel.poll(300);
+ channel.bind(new NullChannel());
}
- channel.bind(new NullChannel());
}
- else {
- channel.bind(new NullChannel());
- }
}
+ catch (ChannelException e) {
+ logger.error("Exception caught while unregistering channel", e);
+ }
}
public void shutdownChannel(KarajanChannel channel) throws ChannelException {
- unregisterChannel(getChannel(channel));
+ unregisterChannel(getMetaChannel(channel));
}
- private MetaChannel getChannel(KarajanChannel channel) throws ChannelException {
+ private MetaChannel getMetaChannel(KarajanChannel channel) throws ChannelException {
if (channel instanceof MetaChannel) {
return (MetaChannel) channel;
}
- return getChannel(channel.getChannelContext());
+ return getMetaChannel(channel.getChannelContext());
}
- private MetaChannel getChannel(ChannelContext context) throws ChannelException {
+ private MetaChannel getMetaChannel(ChannelContext context) throws ChannelException {
synchronized (channels) {
if (logger.isDebugEnabled()) {
logger.debug("\nLooking up " + context.getChannelID());
@@ -324,11 +410,11 @@
}
public void reserveLongTerm(KarajanChannel channel) throws ChannelException {
- getChannel(channel).incLongTermUsageCount();
+ getMetaChannel(channel).incLongTermUsageCount();
}
public void releaseLongTerm(KarajanChannel channel) throws ChannelException {
- getChannel(channel).decLongTermUsageCount();
+ getMetaChannel(channel).decLongTermUsageCount();
}
private static class HostCredentialPair {
@@ -371,4 +457,9 @@
}
}
+
+ private void buffer(KarajanChannel channel) throws ChannelException {
+ MetaChannel meta = getMetaChannel(channel);
+ meta.bind(new BufferingChannel(channel.getChannelContext()));
+ }
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -16,8 +16,10 @@
import org.globus.cog.karajan.workflow.events.EventBus;
import org.globus.cog.karajan.workflow.service.FallbackAuthorization;
import org.globus.cog.karajan.workflow.service.GSSService;
+import org.globus.cog.karajan.workflow.service.RemoteConfiguration;
import org.globus.cog.karajan.workflow.service.RequestManager;
import org.globus.cog.karajan.workflow.service.UserContext;
+import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand;
import org.globus.cog.karajan.workflow.service.commands.ShutdownCommand;
import org.globus.gsi.GSIConstants;
import org.globus.gsi.gssapi.GSSConstants;
@@ -28,7 +30,6 @@
import org.globus.gsi.gssapi.net.GssSocket;
import org.globus.gsi.gssapi.net.GssSocketFactory;
import org.gridforum.jgss.ExtendedGSSContext;
-import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSManager;
@@ -41,8 +42,11 @@
private boolean shuttingDown;
private Exception startException;
private Replier replier;
+ private int id;
+ private static int sid = 1;
- public GSSChannel(GssSocket socket, RequestManager requestManager, ChannelContext sc) {
+ public GSSChannel(GssSocket socket, RequestManager requestManager, ChannelContext sc)
+ throws IOException {
super(requestManager, sc, false);
setSocket(socket);
this.socket = socket;
@@ -56,6 +60,7 @@
}
private void init() {
+ id = sid++;
replier = new Replier(this);
EventBus.initialize();
}
@@ -65,8 +70,13 @@
}
public void start() throws ChannelException {
+ reconnect();
+ super.start();
+ }
+
+ protected void reconnect() throws ChannelException {
try {
- if (socket == null) {
+ if (getContact() != null) {
HostAuthorization hostAuthz = new HostAuthorization("host");
Authorization authz = new FallbackAuthorization(new Authorization[] { hostAuthz,
@@ -79,7 +89,7 @@
GSSManager manager = new GlobusGSSManagerImpl();
ExtendedGSSContext gssContext = (ExtendedGSSContext) manager.createContext(null,
- GSSConstants.MECH_OID, cred, cred.getRemainingLifetime());
+ GSSConstants.MECH_OID, cred, cred.getRemainingLifetime());
gssContext.requestAnonymity(false);
gssContext.requestCredDeleg(false);
@@ -89,21 +99,20 @@
URI contact = getContact();
socket = (GssSocket) GssSocketFactory.getDefault().createSocket(contact.getHost(),
contact.getPort(), gssContext);
- setSocket(socket);
socket.setKeepAlive(true);
socket.setSoTimeout(0);
socket.setWrapMode(GSIConstants.MODE_SSL.intValue());
socket.setAuthorization(authz);
+ setSocket(socket);
logger.info("Connected to " + contact);
- this.getChannelContext().setRemoteContact(contact.toString());
+ getChannelContext().setRemoteContact(contact.toString());
}
}
catch (Exception e) {
throw new ChannelException("Failed to start channel " + this, e);
}
- super.start();
}
protected void initializeConnection() {
@@ -168,7 +177,7 @@
}
public String toString() {
- return "GSSC-" + getContact();
+ return "GSS" + (isClient() ? "C" : "S") + "Channel-" + getContact() + "(" + id + ")";
}
protected synchronized void ensureCallbackServiceStarted() throws Exception {
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -55,6 +55,8 @@
void setChannelContext(ChannelContext context);
boolean isOffline();
+
+ boolean isStarted();
void unregisterCommand(Command cmd);
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -30,11 +30,11 @@
private boolean shuttingDown;
public MetaChannel(ChannelContext channelContext) {
- super(null, channelContext);
+ super(null, channelContext, false);
}
public MetaChannel(RequestManager requestManager, ChannelContext channelContext) {
- super(requestManager, channelContext);
+ super(requestManager, channelContext, false);
}
public synchronized void sendTaggedData(int tag, int flags, byte[] data) {
@@ -102,12 +102,7 @@
}
deactivator = new TimerTask() {
public void run() {
- try {
- ChannelManager.getManager().unregisterChannel(MetaChannel.this);
- }
- catch (ChannelException e) {
- logger.warn("Exception caught while unregistering channel", e);
- }
+ ChannelManager.getManager().unregisterChannel(MetaChannel.this);
}
};
getTimer().schedule(deactivator, (long) seconds * 1000);
@@ -164,6 +159,16 @@
return false;
}
+ public boolean isStarted() {
+ KarajanChannel crt = current;
+ if (crt != null) {
+ return crt.isStarted();
+ }
+ else {
+ return false;
+ }
+ }
+
public void start() throws ChannelException {
}
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -12,13 +12,21 @@
import org.globus.cog.karajan.workflow.service.UserContext;
public class NullChannel extends AbstractKarajanChannel {
+ private boolean sink;
protected NullChannel() {
- super(null, null);
+ super(null, null, false);
}
+
+ protected NullChannel(boolean sink) {
+ super(null, null, false);
+ this.sink = sink;
+ }
public void sendTaggedData(int i, int flags, byte[] bytes) {
- throw new ChannelIOException("Null channel");
+ if (!sink) {
+ throw new ChannelIOException("Null channel");
+ }
}
public UserContext getUserContext() {
@@ -36,4 +44,7 @@
public void start() throws ChannelException {
}
+ public boolean isStarted() {
+ return true;
+ }
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -9,6 +9,7 @@
*/
package org.globus.cog.karajan.workflow.service.channels;
+import java.io.IOException;
import java.net.Socket;
import java.net.URI;
@@ -25,13 +26,19 @@
setName(contact.toString());
}
- public TCPChannel(Socket socket, RequestManager requestManager, ChannelContext channelContext) {
+ public TCPChannel(Socket socket, RequestManager requestManager, ChannelContext channelContext)
+ throws IOException {
super(requestManager, channelContext, false);
setSocket(socket);
uc = new UserContext(null, channelContext);
}
public void start() throws ChannelException {
+ reconnect();
+ super.start();
+ }
+
+ protected void reconnect() throws ChannelException {
try {
if (contact != null) {
setSocket(new Socket(contact.getHost(), contact.getPort()));
@@ -40,7 +47,6 @@
catch (Exception e) {
throw new ChannelException("Failed to create socket", e);
}
- super.start();
}
public UserContext getUserContext() {
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java 2008-09-16 17:54:08 UTC (rev 2170)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java 2008-09-16 17:56:08 UTC (rev 2171)
@@ -40,10 +40,11 @@
private ServiceContext sc;
private UDPService service;
private Map tagSeq;
+ private boolean started;
public UDPChannel(DatagramSocket ds, ChannelContext context, RequestManager rm,
UDPService service, InetSocketAddress addr) {
- super(rm, context);
+ super(rm, context, false);
this.ds = ds;
this.service = service;
this.addr = addr.getAddress();
@@ -53,7 +54,7 @@
}
public UDPChannel(URI contact, ChannelContext context, RequestManager rm) {
- super(rm, context);
+ super(rm, context, true);
this.contact = contact;
}
@@ -80,6 +81,7 @@
addr = InetAddress.getByName(contact.getHost());
port = contact.getPort();
}
+ started = true;
}
catch (Exception e) {
throw new ChannelException("Failed to start UDP channel", e);
@@ -204,4 +206,8 @@
public InetSocketAddress getRemoteAddress() {
return new InetSocketAddress(addr, port);
}
+
+ public boolean isStarted() {
+ return started;
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|