|
From: <ha...@us...> - 2008-04-22 14:00:20
|
Revision: 1966
http://cogkit.svn.sourceforge.net/cogkit/?rev=1966&view=rev
Author: hategan
Date: 2008-04-22 07:00:10 -0700 (Tue, 22 Apr 2008)
Log Message:
-----------
updated service communication library
Modified Paths:
--------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestManager.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceContext.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UserContext.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/BufferingChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelContext.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelManager.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/KarajanChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/MetaChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/NullChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ReplyEvent.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TagTable.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/ChannelConfigurationCommand.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/commands/Command.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/ChannelConfigurationHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/EchoHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/EventHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/GroupHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/RequestHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/ShutdownHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/StartHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/TestHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/handlers/UnknownCommandHandler.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/test/Services.java
Added Paths:
-----------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ChannelFactory.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractTCPChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/ChannelAttributes.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/TCPChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/UDPChannel.java
Removed Paths:
-------------
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractSocketChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/GSSSocketChannel.java
trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/PlainSocketChannel.java
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/AbstractRequestManager.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -22,7 +22,7 @@
handlers = new Hashtable();
}
- protected void addHandler(String cmd, Class cls) {
+ public void addHandler(String cmd, Class cls) {
handlers.put(cmd, cls);
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ChannelFactory.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ChannelFactory.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ChannelFactory.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -0,0 +1,46 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Feb 14, 2008
+ */
+package org.globus.cog.karajan.workflow.service;
+
+import java.net.URI;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelException;
+import org.globus.cog.karajan.workflow.service.channels.GSSChannel;
+import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
+import org.globus.cog.karajan.workflow.service.channels.TCPChannel;
+import org.globus.cog.karajan.workflow.service.channels.UDPChannel;
+
+public class ChannelFactory {
+ public static final Logger logger = Logger.getLogger(ChannelFactory.class);
+
+ public static KarajanChannel newChannel(URI contact, ChannelContext context, RequestManager rm)
+ throws ChannelException {
+ KarajanChannel channel;
+ if (contact.getScheme() == null || contact.getScheme().equals("tcps")) {
+ channel = new GSSChannel(contact, rm, context);
+ }
+ else if (contact.getScheme().equals("https")) {
+ channel = new GSSChannel(contact, rm, context);
+ }
+ else if (contact.getScheme().equals("tcp")) {
+ channel = new TCPChannel(contact, context, rm);
+ }
+ else if (contact.getScheme().equals("udp")) {
+ channel = new UDPChannel(contact, context, rm);
+ }
+ else {
+ throw new IllegalArgumentException("Scheme not supported: " + contact);
+ }
+ channel.start();
+ return channel;
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Client.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -10,7 +10,6 @@
package org.globus.cog.karajan.workflow.service;
import java.io.IOException;
-import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Hashtable;
@@ -18,30 +17,16 @@
import org.apache.log4j.Logger;
import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
-import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
-import org.globus.cog.karajan.workflow.service.channels.GSSSocketChannel;
import org.globus.cog.karajan.workflow.service.channels.KarajanChannel;
import org.globus.cog.karajan.workflow.service.commands.ChannelConfigurationCommand;
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.commands.VersionCommand;
-import org.globus.gsi.GSIConstants;
-import org.globus.gsi.gssapi.GSSConstants;
-import org.globus.gsi.gssapi.GlobusGSSManagerImpl;
-import org.globus.gsi.gssapi.auth.Authorization;
-import org.globus.gsi.gssapi.auth.HostAuthorization;
-import org.globus.gsi.gssapi.auth.SelfAuthorization;
-import org.globus.gsi.gssapi.net.GssSocket;
-import org.globus.gsi.gssapi.net.GssSocketFactory;
-import org.gridforum.jgss.ExtendedGSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSManager;
public class Client {
private static final Logger logger = Logger.getLogger(Client.class);
private final URI contact;
- private Socket socket;
- private GSSSocketChannel channel;
+ private KarajanChannel channel;
private RequestManager requestManager;
private ChannelContext sc;
private static Service callback;
@@ -75,7 +60,12 @@
}
public static Client newClient(String contact, ChannelContext context) throws Exception {
- Client client = new Client(contact);
+ return newClient(contact, context, new ClientRequestManager());
+ }
+
+ public static Client newClient(String contact, ChannelContext context,
+ RequestManager requestManager) throws Exception {
+ Client client = new Client(contact, requestManager);
client.setChannelContext(context);
context.getChannelID().setClient(true);
synchronized (client) {
@@ -89,7 +79,12 @@
}
public Client(String contact) throws URISyntaxException {
+ this(contact, new ClientRequestManager());
+ }
+
+ public Client(String contact, RequestManager requestManager) throws URISyntaxException {
this.contact = new URI(contact);
+ this.requestManager = requestManager;
}
public void connect() throws Exception {
@@ -117,44 +112,15 @@
if (port == -1) {
port = 1984;
}
- HostAuthorization hostAuthz = new HostAuthorization("host");
-
- Authorization authz = new FallbackAuthorization(new Authorization[] { hostAuthz,
- SelfAuthorization.getInstance() });
-
-
- GSSCredential cred = sc.getCredential();
-
- GSSManager manager = new GlobusGSSManagerImpl();
- ExtendedGSSContext context = (ExtendedGSSContext) manager.createContext(null,
- GSSConstants.MECH_OID, cred, 2 * 3600);
-
- context.requestAnonymity(false);
- context.requestCredDeleg(false);
- context.setOption(GSSConstants.GSS_MODE, GSIConstants.MODE_SSL);
- context.setOption(GSSConstants.DELEGATION_TYPE, GSIConstants.DELEGATION_TYPE_LIMITED);
-
- socket = GssSocketFactory.getDefault().createSocket(host, port, context);
- socket.setKeepAlive(true);
- socket.setSoTimeout(0);
- ((GssSocket) socket).setWrapMode(GSIConstants.MODE_SSL.intValue());
- ((GssSocket) socket).setAuthorization(authz);
- requestManager = new ClientRequestManager();
- logger.info("Connected to " + host + ":" + port);
-
- sc.setRemoteContact(contact.toString());
- channel = new GSSSocketChannel((GssSocket) socket, requestManager, sc, true,
- contact.toString());
- channel.start();
-
- String callbackURL = null;
+ channel = ChannelFactory.newChannel(contact, sc, requestManager);
+ URI callbackURI = null;
if (sc.getConfiguration().hasOption(RemoteConfiguration.CALLBACK)) {
- callbackURL = ChannelManager.getManager().getCallbackURL();
+ callbackURI = channel.getCallbackURI();
}
String remoteID = getChannel().getChannelContext().getChannelID().getRemoteID();
ChannelConfigurationCommand ccc = new ChannelConfigurationCommand(
- sc.getConfiguration(), callbackURL);
+ sc.getConfiguration(), callbackURI);
ccc.execute(this.getChannel());
connected = true;
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ConnectionHandler.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -12,29 +12,32 @@
import java.net.Socket;
import org.apache.log4j.Logger;
-import org.globus.cog.karajan.workflow.service.channels.AbstractSocketChannel;
+import org.globus.cog.karajan.workflow.service.channels.AbstractTCPChannel;
import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
-import org.globus.cog.karajan.workflow.service.channels.GSSSocketChannel;
-import org.globus.cog.karajan.workflow.service.channels.PlainSocketChannel;
+import org.globus.cog.karajan.workflow.service.channels.GSSChannel;
+import org.globus.cog.karajan.workflow.service.channels.TCPChannel;
import org.globus.gsi.gssapi.net.GssSocket;
public class ConnectionHandler {
private static final Logger logger = Logger.getLogger(ConnectionHandler.class);
private final Socket socket;
- private final AbstractSocketChannel channel;
+ private final AbstractTCPChannel channel;
private final RequestManager requestManager;
public ConnectionHandler(Service service, Socket socket) {
+ this(service, socket, null);
+ }
+
+ public ConnectionHandler(Service service, Socket socket, RequestManager requestManager) {
this.socket = socket;
- requestManager = new ServiceRequestManager();
+ this.requestManager = requestManager == null ? new ServiceRequestManager() : requestManager;
if (socket instanceof GssSocket) {
- channel = new GSSSocketChannel((GssSocket) socket, requestManager, new ChannelContext(
- service), false);
+ channel = new GSSChannel((GssSocket) socket, this.requestManager, new ChannelContext(
+ service));
}
else {
- channel = new PlainSocketChannel(socket, requestManager, new ChannelContext(service),
- false);
+ channel = new TCPChannel(socket, this.requestManager, new ChannelContext(service));
}
}
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/GSSService.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -0,0 +1,273 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 18, 2005
+ */
+package org.globus.cog.karajan.workflow.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.URI;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.abstraction.impl.common.AbstractionFactory;
+import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
+import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
+import org.globus.cog.abstraction.impl.common.task.TaskImpl;
+import org.globus.cog.abstraction.interfaces.JobSpecification;
+import org.globus.cog.abstraction.interfaces.Task;
+import org.globus.cog.abstraction.interfaces.TaskHandler;
+import org.globus.cog.util.ArgumentParser;
+import org.globus.cog.util.ArgumentParserException;
+import org.globus.cog.util.GridMap;
+import org.globus.gsi.GlobusCredential;
+import org.globus.gsi.GlobusCredentialException;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.gssapi.auth.SelfAuthorization;
+import org.globus.net.BaseServer;
+import org.globus.net.ServerSocketFactory;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+
+public class GSSService extends BaseServer implements Service {
+ private static final Logger logger = Logger.getLogger(Service.class);
+ private ServiceContext context = new ServiceContext(this);
+ private Thread serverThread;
+ private boolean restricted;
+ private URI contact;
+ private RequestManager requestManager;
+
+ public GSSService() throws IOException {
+ super();
+ }
+
+ public GSSService(GSSCredential cred) throws IOException {
+ this(cred, 0);
+ }
+
+ public GSSService(boolean secure, int port) throws IOException {
+ super(secure, port);
+ }
+
+ public GSSService(GSSCredential cred, int port) throws IOException {
+ super(cred, port);
+ }
+
+ public GSSService(GSSCredential cred, int port, InetAddress bindTo) throws IOException {
+ super(cred, port);
+ if (bindTo != null) {
+ this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo);
+ }
+ }
+
+ public GSSService(int port) throws IOException {
+ this(true, port);
+ }
+
+ public GSSService(boolean secure, int port, InetAddress bindTo) throws IOException {
+ super(secure, port);
+ if (bindTo != null) {
+ this._server.close();
+ this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo);
+ }
+ }
+
+ protected void setRequestManager(RequestManager requestManager) {
+ this.requestManager = requestManager;
+ }
+
+ public void initialize() {
+ // prevent the server from being started by BaseServer constructors
+ }
+
+ public static GSSCredential initializeCredentials(boolean personal, String hostcert,
+ String hostkey) throws GlobusCredentialException, GSSException {
+ if (!personal) {
+ return new GlobusGSSCredentialImpl(new GlobusCredential(hostcert != null ? hostcert
+ : "/etc/grid-security/hostcert.pem", hostkey != null ? hostkey
+ : "/etc/grid-security/hostkey.pem"), GSSCredential.INITIATE_AND_ACCEPT);
+ }
+ else {
+ return new GlobusGSSCredentialImpl(GlobusCredential.getDefaultCredential(),
+ GSSCredential.INITIATE_AND_ACCEPT);
+ }
+ }
+
+ protected void handleConnection(Socket socket) {
+ logger.debug("Got connection");
+ try {
+ ConnectionHandler handler = new ConnectionHandler(this, socket, requestManager);
+ handler.start();
+ }
+ catch (Exception e) {
+ logger.warn("Could not start connection handler", e);
+ }
+ }
+
+ public URI getContact() {
+ return contact;
+ }
+
+ public void start() {
+ try {
+ contact = new URI(getProtocol(), null, getHost(), getPort(), null, null, null);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (serverThread == null) {
+ accept = true;
+ serverThread = new Thread(this);
+ serverThread.setName("Server: " + getContact());
+ serverThread.start();
+ }
+ }
+
+ public String toString() {
+ return String.valueOf(contact);
+ }
+
+ public boolean isRestricted() {
+ return restricted;
+ }
+
+ public void setRestricted(boolean restricted) {
+ this.restricted = restricted;
+ }
+
+ public ServiceContext getContext() {
+ return context;
+ }
+
+ public static void main(String[] args) {
+ ArgumentParser ap = new ArgumentParser();
+ ap.setExecutableName("cog-workflow-service");
+ ap.addFlag("personal", "Starts the service in personal mode. In personal mode, "
+ + "the service uses the user credential, and does not use a restricted "
+ + "execution environment. This is the default mode.");
+ ap.addFlag("shared", "Starts the service in shared mode. In shared mode, "
+ + "the service uses the host credentials, provides a restricted execution "
+ + "environment, and uses the grid-map file for authorization.");
+ ap.addFlag("nosec", "Starts the service without security");
+ ap.addFlag("local",
+ "Binds the socket to the local host. Connections from the network will not be possible.");
+ ap.addOption("port",
+ "Specifies the port that the service should be started on. The default is 1984.",
+ "port-number", ArgumentParser.OPTIONAL);
+ ap.addOption(
+ "gridmap",
+ "Specifies the location of the grid map file, which is used for "
+ + "mapping certificate distinguished names to local user accounts. This options is "
+ + "only meaningful if the service is started in shared mode. The default is "
+ + GridMap.DEFAULT_GRID_MAP, "file", ArgumentParser.OPTIONAL);
+ ap.addOption("hostcert", "Indicates the location of the host certificate. This option "
+ + "is only used in shared mode. The default is /etc/grid-security/hostcert.pem",
+ "file", ArgumentParser.OPTIONAL);
+ ap.addOption("hostkey", "Indicates the location of the host key. This option "
+ + "is only used in shared mode. The default is /etc/grid-security/hostkey.pem",
+ "file", ArgumentParser.OPTIONAL);
+ ap.addFlag("help", "Displays usage information");
+ ap.addAlias("port", "p");
+ try {
+ ap.parse(args);
+ }
+ catch (ArgumentParserException e) {
+ ap.usage();
+ System.exit(1);
+ }
+ if (ap.isPresent("help")) {
+ ap.usage();
+ System.exit(1);
+ }
+
+ int port = 1984;
+ if (ap.isPresent("port")) {
+ port = ap.getIntValue("port");
+ }
+ boolean personal = true;
+ String hostcert = null;
+ String hostkey = null;
+ boolean bind = false;
+ boolean nosec = false;
+ if (ap.isPresent("shared")) {
+ if (ap.isPresent("gridmap")) {
+ System.setProperty("grid.mapfile", ap.getStringValue("gridmap"));
+ }
+ personal = false;
+ if (ap.isPresent("personal")) {
+ System.err.println("-shared and -personal are mutually exclusive");
+ System.exit(1);
+ }
+ if (ap.isPresent("hostcert")) {
+ hostcert = ap.getStringValue("hostcert");
+ }
+ if (ap.isPresent("hostkey")) {
+ hostkey = ap.getStringValue("hostkey");
+ }
+ Task task = new TaskImpl();
+ JobSpecification spec = new JobSpecificationImpl();
+ task.setType(Task.JOB_SUBMISSION);
+ spec.setExecutable("test");
+ task.setSpecification(spec);
+ try {
+ TaskHandler handler = AbstractionFactory.newExecutionTaskHandler("local");
+ handler.submit(task);
+ throw new Exception();
+ }
+ catch (InvalidSecurityContextException e) {
+ // This means that it's the gridmapped local provider
+ }
+ catch (Exception e) {
+ System.err.println("The service will not run in shared mode with the default local provider.");
+ System.err.println("Please install the grid-mapped local provider");
+ System.exit(2);
+ }
+ }
+ if (ap.isPresent("nosec")) {
+ nosec = true;
+ }
+ if (ap.isPresent("local")) {
+ bind = true;
+ }
+
+ try {
+ GSSService service;
+ InetAddress bindTo = null;
+ if (bind) {
+ bindTo = InetAddress.getLocalHost();
+ }
+ if (nosec) {
+ service = new GSSService(false, port, bindTo);
+ }
+ else {
+ service = new GSSService(initializeCredentials(personal, hostcert, hostkey), port,
+ bindTo);
+ }
+ if (personal) {
+ service.setAuthorization(new SelfAuthorization());
+ service.setRestricted(false);
+ }
+ else {
+ service.setAuthorization(new GridMapAuthorization());
+ service.setRestricted(true);
+ }
+ if (bind) {
+ service.getContext().setLocal(true);
+ }
+ service.start();
+ System.out.println("Service started on port " + service.getPort());
+ while (true) {
+ Thread.sleep(10000);
+ }
+ }
+ catch (Exception e) {
+ System.err.println("Could not start service:\n\t" + e.getMessage());
+ System.exit(1);
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestManager.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestManager.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestManager.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -14,4 +14,6 @@
public interface RequestManager {
RequestHandler handleInitialRequest(byte[] data)
throws NoSuchHandlerException;
+
+ void addHandler(String name, Class cls);
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/RequestReply.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -72,6 +72,15 @@
this.addOutData(str.getBytes());
}
+ protected void addOutData(int value) {
+ byte[] b = new byte[4];
+ b[0] = (byte) (value & 0xff);
+ b[1] = (byte) ((value >> 8) & 0xff);
+ b[2] = (byte) ((value >> 16) & 0xff);
+ b[3] = (byte) ((value >> 24) & 0xff);
+ addOutData(b);
+ }
+
protected void sendError(String error) throws ProtocolException {
sendError(error, null);
}
@@ -98,8 +107,10 @@
public abstract void send() throws ProtocolException;
- protected abstract void dataReceived(byte[] data) throws ProtocolException;
+ protected void dataReceived(byte[] data) throws ProtocolException {
+ }
+
protected synchronized void addInData(byte[] data) {
synchronized (this) {
if (inData == null) {
@@ -146,9 +157,18 @@
if (inData == null) {
return null;
}
- return (byte[]) inData.get(index);
+ try {
+ return (byte[]) inData.get(index);
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new IllegalArgumentException("Missing command argument #" + (index + 1));
+ }
}
+ public String getInDataAsString(int index) {
+ return new String(getInData(index));
+ }
+
public synchronized void setInData(int index, byte[] data) {
if (inData == null) {
inData = new LinkedList();
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/Service.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -5,252 +5,18 @@
//----------------------------------------------------------------------
/*
- * Created on Jul 18, 2005
+ * Created on Feb 14, 2008
*/
package org.globus.cog.karajan.workflow.service;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
+import java.net.URI;
-import org.apache.log4j.Logger;
-import org.globus.cog.abstraction.impl.common.AbstractionFactory;
-import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
-import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
-import org.globus.cog.abstraction.impl.common.task.TaskImpl;
-import org.globus.cog.abstraction.interfaces.JobSpecification;
-import org.globus.cog.abstraction.interfaces.Task;
-import org.globus.cog.abstraction.interfaces.TaskHandler;
-import org.globus.cog.util.ArgumentParser;
-import org.globus.cog.util.ArgumentParserException;
-import org.globus.cog.util.GridMap;
-import org.globus.gsi.GlobusCredential;
-import org.globus.gsi.GlobusCredentialException;
-import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
-import org.globus.gsi.gssapi.auth.SelfAuthorization;
-import org.globus.net.BaseServer;
-import org.globus.net.ServerSocketFactory;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
+public interface Service {
-public class Service extends BaseServer {
- private static final Logger logger = Logger.getLogger(Service.class);
- private ServiceContext context = new ServiceContext(this);
- private Thread serverThread;
- private boolean restricted;
+ boolean isRestricted();
- public Service(GSSCredential cred) throws IOException {
- this(cred, 0);
- }
+ URI getContact();
- public Service(boolean secure, int port) throws IOException {
- super(secure, port);
- super.initialize();
- }
+ ServiceContext getContext();
- public Service(GSSCredential cred, int port) throws IOException {
- super(cred, port);
- super.initialize();
- }
-
- public Service(GSSCredential cred, int port, InetAddress bindTo) throws IOException {
- super(cred, port);
- if (bindTo != null) {
- this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo);
- }
- super.initialize();
- }
-
- public Service(int port) throws IOException {
- this(true, port);
- }
-
- public Service(boolean secure, int port, InetAddress bindTo) throws IOException {
- super(secure, port);
- if (bindTo != null) {
- this._server.close();
- this._server = ServerSocketFactory.getDefault().createServerSocket(port, 50, bindTo);
- }
- super.initialize();
- }
-
- public void initialize() {
- // prevent the server from being started by BaseServer constructors
- }
-
- public static GSSCredential initializeCredentials(boolean personal, String hostcert,
- String hostkey) throws GlobusCredentialException, GSSException {
- if (!personal) {
- return new GlobusGSSCredentialImpl(new GlobusCredential(hostcert != null ? hostcert
- : "/etc/grid-security/hostcert.pem", hostkey != null ? hostkey
- : "/etc/grid-security/hostkey.pem"), GSSCredential.INITIATE_AND_ACCEPT);
- }
- else {
- return new GlobusGSSCredentialImpl(GlobusCredential.getDefaultCredential(),
- GSSCredential.INITIATE_AND_ACCEPT);
- }
- }
-
- protected void handleConnection(Socket socket) {
- logger.debug("Got connection");
- try {
- ConnectionHandler handler = new ConnectionHandler(this, socket);
- handler.start();
- }
- catch (Exception e) {
- logger.warn("Could not start connection handler", e);
- }
- }
-
- public String getContactString() {
- return "https://" + getHost() + ":" + getPort();
- }
-
- protected void start() {
- if (serverThread == null) {
- accept = true;
- serverThread = new Thread(this);
- serverThread.setName("Server: " + getContactString());
- serverThread.start();
- }
- }
-
- public boolean isRestricted() {
- return restricted;
- }
-
- public void setRestricted(boolean restricted) {
- this.restricted = restricted;
- }
-
- public ServiceContext getContext() {
- return context;
- }
-
- public static void main(String[] args) {
- ArgumentParser ap = new ArgumentParser();
- ap.setExecutableName("cog-workflow-service");
- ap.addFlag("personal", "Starts the service in personal mode. In personal mode, "
- + "the service uses the user credential, and does not use a restricted "
- + "execution environment. This is the default mode.");
- ap.addFlag("shared", "Starts the service in shared mode. In shared mode, "
- + "the service uses the host credentials, provides a restricted execution "
- + "environment, and uses the grid-map file for authorization.");
- ap.addFlag("nosec", "Starts the service without security");
- ap.addFlag("local",
- "Binds the socket to the local host. Connections from the network will not be possible.");
- ap.addOption("port",
- "Specifies the port that the service should be started on. The default is 1984.",
- "port-number", ArgumentParser.OPTIONAL);
- ap.addOption(
- "gridmap",
- "Specifies the location of the grid map file, which is used for "
- + "mapping certificate distinguished names to local user accounts. This options is "
- + "only meaningful if the service is started in shared mode. The default is "
- + GridMap.DEFAULT_GRID_MAP, "file", ArgumentParser.OPTIONAL);
- ap.addOption("hostcert", "Indicates the location of the host certificate. This option "
- + "is only used in shared mode. The default is /etc/grid-security/hostcert.pem",
- "file", ArgumentParser.OPTIONAL);
- ap.addOption("hostkey", "Indicates the location of the host key. This option "
- + "is only used in shared mode. The default is /etc/grid-security/hostkey.pem",
- "file", ArgumentParser.OPTIONAL);
- ap.addFlag("help", "Displays usage information");
- ap.addAlias("port", "p");
- try {
- ap.parse(args);
- }
- catch (ArgumentParserException e) {
- ap.usage();
- System.exit(1);
- }
- if (ap.isPresent("help")) {
- ap.usage();
- System.exit(1);
- }
-
- int port = 1984;
- if (ap.isPresent("port")) {
- port = ap.getIntValue("port");
- }
- boolean personal = true;
- String hostcert = null;
- String hostkey = null;
- boolean bind = false;
- boolean nosec = false;
- if (ap.isPresent("shared")) {
- if (ap.isPresent("gridmap")) {
- System.setProperty("grid.mapfile", ap.getStringValue("gridmap"));
- }
- personal = false;
- if (ap.isPresent("personal")) {
- System.err.println("-shared and -personal are mutually exclusive");
- System.exit(1);
- }
- if (ap.isPresent("hostcert")) {
- hostcert = ap.getStringValue("hostcert");
- }
- if (ap.isPresent("hostkey")) {
- hostkey = ap.getStringValue("hostkey");
- }
- Task task = new TaskImpl();
- JobSpecification spec = new JobSpecificationImpl();
- task.setType(Task.JOB_SUBMISSION);
- spec.setExecutable("test");
- task.setSpecification(spec);
- try {
- TaskHandler handler = AbstractionFactory.newExecutionTaskHandler("local");
- handler.submit(task);
- throw new Exception();
- }
- catch (InvalidSecurityContextException e) {
- // This means that it's the gridmapped local provider
- }
- catch (Exception e) {
- System.err.println("The service will not run in shared mode with the default local provider.");
- System.err.println("Please install the grid-mapped local provider");
- System.exit(2);
- }
- }
- if (ap.isPresent("nosec")) {
- nosec = true;
- }
- if (ap.isPresent("local")) {
- bind = true;
- }
-
- try {
- Service service;
- InetAddress bindTo = null;
- if (bind) {
- bindTo = InetAddress.getLocalHost();
- }
- if (nosec) {
- service = new Service(false, port, bindTo);
- }
- else {
- service = new Service(initializeCredentials(personal, hostcert, hostkey), port,
- bindTo);
- }
- if (personal) {
- service.setAuthorization(new SelfAuthorization());
- service.setRestricted(false);
- }
- else {
- service.setAuthorization(new GridMapAuthorization());
- service.setRestricted(true);
- }
- if (bind) {
- service.getContext().setLocal(true);
- }
- service.start();
- System.out.println("Service started on port " + service.getPort());
- while (true) {
- Thread.sleep(10000);
- }
- }
- catch (Exception e) {
- System.err.println("Could not start service:\n\t" + e.getMessage());
- System.exit(1);
- }
- }
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceContext.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceContext.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/ServiceContext.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -38,6 +38,7 @@
}
public UserContext getUserContext(String name, ChannelContext channelContext) {
+ //TODO this doesn't make much sense
synchronized(users) {
String sname = String.valueOf(name);
UserContext uc = (UserContext) users.get(sname);
Added: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java (rev 0)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UDPService.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -0,0 +1,137 @@
+//----------------------------------------------------------------------
+//This code is developed as part of the Java CoG Kit project
+//The terms of the license can be found at http://www.cogkit.org/license
+//This message may not be removed or altered.
+//----------------------------------------------------------------------
+
+/*
+ * Created on Jul 18, 2005
+ */
+package org.globus.cog.karajan.workflow.service;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.globus.cog.karajan.workflow.service.channels.ChannelContext;
+import org.globus.cog.karajan.workflow.service.channels.ChannelException;
+import org.globus.cog.karajan.workflow.service.channels.ChannelManager;
+import org.globus.cog.karajan.workflow.service.channels.UDPChannel;
+
+public class UDPService implements Service, Runnable {
+ private static final Logger logger = Logger.getLogger(Service.class);
+
+ public static final int BUFFER_SIZE = 2048;
+
+ private ServiceContext context = new ServiceContext(this);
+ private Thread serverThread;
+ private boolean restricted;
+ private Map channels;
+ private DatagramSocket socket;
+ private RequestManager rm;
+ private byte[] recvbuf;
+ private boolean shutdownFlag;
+ private URI contact;
+
+ public UDPService(RequestManager rm) throws IOException {
+ this.rm = rm;
+ channels = new HashMap();
+ }
+
+ public URI getContact() {
+ return contact;
+ }
+
+ public void start() throws ChannelException {
+ recvbuf = new byte[BUFFER_SIZE];
+ try {
+ socket = new DatagramSocket();
+ InetSocketAddress addr = (InetSocketAddress) socket.getLocalSocketAddress();
+ contact = new URI("udp", null, InetAddress.getLocalHost().getHostAddress(),
+ addr.getPort(), null, null, null);
+
+ }
+ catch (Exception e) {
+ throw new ChannelException(e);
+ }
+ serverThread = new Thread(this);
+ serverThread.setName("UDP Service");
+ serverThread.start();
+ }
+
+ public void run() {
+ DatagramPacket dp = new DatagramPacket(recvbuf, recvbuf.length);
+ try {
+ while (!shutdownFlag) {
+ socket.receive(dp);
+ InetSocketAddress addr = (InetSocketAddress) dp.getSocketAddress();
+ UDPChannel channel;
+ synchronized (channels) {
+ channel = (UDPChannel) channels.get(addr);
+ if (channel == null) {
+ ChannelContext cc = new ChannelContext(context);
+ channel = new UDPChannel(socket, cc, rm, this, addr);
+ channels.put(addr, channel);
+ ChannelManager.getManager().registerChannel(
+ "udp://" + addr.getAddress().getHostAddress() + ":" + (addr.getPort() + 1), null, channel);
+ }
+ }
+ byte[] buf = new byte[dp.getLength()];
+ System.arraycopy(recvbuf, 0, buf, 0, dp.getLength());
+ try {
+ channel.dataReceived(buf);
+ }
+ catch (ChannelException e) {
+ logger.warn("Channel failed to process incoming message", e);
+ synchronized (channels) {
+ channels.remove(addr);
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public String toString() {
+ return String.valueOf(getContact());
+ }
+
+ public boolean isRestricted() {
+ return restricted;
+ }
+
+ public void setRestricted(boolean restricted) {
+ this.restricted = restricted;
+ }
+
+ public ServiceContext getContext() {
+ return context;
+ }
+
+ public void channelShutDown(UDPChannel channel) {
+ synchronized (channels) {
+ channels.remove(channel.getRemoteAddress());
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ RequestManager rm = new ServiceRequestManager();
+ UDPService s = new UDPService(new ServiceRequestManager());
+ s.start();
+ System.err.println(s.getContact());
+ Thread.sleep(100000000);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UserContext.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UserContext.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/UserContext.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -75,6 +75,9 @@
}
}
+ /**
+ * Returns the channel context of the channel that created this user context
+ */
public ChannelContext getChannelContext() {
return channelContext;
}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractKarajanChannel.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -12,13 +12,15 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
+import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;
+import org.globus.cog.karajan.workflow.service.NoSuchHandlerException;
import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.RequestManager;
+import org.globus.cog.karajan.workflow.service.Service;
import org.globus.cog.karajan.workflow.service.commands.Command;
import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
@@ -30,6 +32,8 @@
private final RequestManager requestManager;
private final List registeredMaps;
private boolean localShutdown, closed;
+ private String name;
+ private Service callbackService;
protected AbstractKarajanChannel(RequestManager requestManager, ChannelContext channelContext) {
if (channelContext != null) {
@@ -66,7 +70,7 @@
public void sendTaggedData(int tag, boolean fin, byte[] data) {
if (logger.isDebugEnabled()) {
- logger.debug(this + "REQ>: tag = " + tag + ", fin = " + fin + ", datalen = "
+ logger.debug(this + " REQ>: tag = " + tag + ", fin = " + fin + ", datalen = "
+ data.length + ", data = " + ppByteBuf(data));
}
sendTaggedData(tag, fin ? FINAL_FLAG : 0, data);
@@ -74,7 +78,7 @@
public void sendTaggedReply(int tag, byte[] data, boolean fin, boolean err) {
if (logger.isDebugEnabled()) {
- logger.debug(this + "REPL>: tag = " + tag + ", fin = " + fin + ", datalen = "
+ logger.debug(this + " REPL>: tag = " + tag + ", fin = " + fin + ", datalen = "
+ data.length + ", data = " + ppByteBuf(data));
}
int flags = REPLY_FLAG;
@@ -90,18 +94,10 @@
public ChannelContext getChannelContext() {
return context;
}
-
+
public void setChannelContext(ChannelContext context) {
this.context = context;
}
-
- protected void readFromStream(InputStream stream, ByteBuffer buf) throws IOException {
- int count = stream.read(buf.array());
- if (count == -1) {
- throw new EOFException("Connection closed");
- }
- buf.position(buf.position() + count);
- }
protected int readFromStream(InputStream stream, byte[] buf, int crt) throws IOException {
int count = stream.read(buf, crt, buf.length - crt);
@@ -111,6 +107,22 @@
return crt + count;
}
+ public static void pack(byte[] buf, int offset, int value) {
+ buf[offset] = (byte) (value & 0xff);
+ buf[offset + 1] = (byte) ((value >> 8) & 0xff);
+ buf[offset + 2] = (byte) ((value >> 16) & 0xff);
+ buf[offset + 3] = (byte) ((value >> 24) & 0xff);
+ }
+
+ public static int unpack(byte[] buf, int offset) {
+ int i = 0;
+ i += (buf[offset] & 0xff);
+ i += (buf[offset + 1] & 0xff) << 8;
+ i += (buf[offset + 2] & 0xff) << 16;
+ i += (buf[offset + 3] & 0xff) << 24;
+ return i;
+ }
+
public static String ppByteBuf(byte[] data) {
byte[] buf = new byte[Math.min(data.length, 256)];
for (int i = 0; i < buf.length; i++) {
@@ -156,7 +168,7 @@
public void close() {
closed = true;
}
-
+
public boolean isClosed() {
return closed;
}
@@ -172,4 +184,125 @@
public boolean isClient() {
return false;
}
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String toString() {
+ return getName();
+ }
+
+ public synchronized URI getCallbackURI() throws Exception {
+ ensureCallbackServiceStarted();
+ return getCallbackService().getContact();
+ }
+
+ public Service getCallbackService() {
+ return callbackService;
+ }
+
+ public void setCallbackService(Service callbackService) {
+ this.callbackService = callbackService;
+ }
+
+ protected void ensureCallbackServiceStarted() throws Exception {
+
+ }
+
+ protected void handleReply(int tag, boolean fin, boolean error, int len, byte[] data) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(this + " REPL<: tag = " + tag + ", fin = " + fin
+ + ", err = " + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
+ }
+ Command cmd = getChannelContext().getRegisteredCommand(tag);
+ if (cmd != null) {
+ try {
+ cmd.replyReceived(data);
+ if (fin) {
+ if (error) {
+ cmd.errorReceived();
+ }
+ else {
+ cmd.receiveCompleted();
+ }
+ unregisterCommand(cmd);
+ }
+ }
+ catch (ProtocolException e) {
+ logger.warn("Exception caught while processing reply", e);
+ cmd.errorReceived(e.getMessage(), e);
+ }
+ }
+ else {
+ unregisteredSender(tag);
+ }
+ }
+
+ protected void unregisteredSender(int tag) {
+ logger.warn(getName() + " Recieved reply to unregistered sender. Tag: " + tag);
+ }
+
+ protected void handleRequest(int tag, boolean fin, boolean error, int len, byte[] data) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(this + " REQ<: tag = " + tag + ", fin = " + fin
+ + ", err = " + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
+ }
+ RequestHandler handler = getChannelContext().getRegisteredHandler(tag);
+ try {
+ if (handler != null) {
+ handler.register(this);
+ handler.dataReceived(data);
+ }
+ else {
+ try {
+ handler = getRequestManager().handleInitialRequest(data);
+ handler.setId(tag);
+ registerHandler(handler, tag);
+ }
+ catch (NoSuchHandlerException e) {
+ logger.warn(getName() + "Could not handle request", e);
+ }
+
+ }
+ if (fin) {
+ try {
+ if (error) {
+ // TODO
+ }
+ else {
+ handler.receiveCompleted();
+ }
+ }
+ catch (ChannelIOException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to process request", e);
+ }
+ if (!handler.isReplySent()) {
+ handler.sendError(e.toString(), e);
+ }
+ }
+ catch (Error e) {
+ if (!handler.isReplySent()) {
+ handler.sendError(e.toString(), e);
+ }
+ throw e;
+ }
+ finally {
+ unregisterHandler(tag);
+ }
+ }
+ }
+ catch (ProtocolException e) {
+ unregisterHandler(tag);
+ logger.warn(e);
+ }
+ }
}
Deleted: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractSocketChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractSocketChannel.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractSocketChannel.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -1,137 +0,0 @@
-//----------------------------------------------------------------------
-//This code is developed as part of the Java CoG Kit project
-//The terms of the license can be found at http://www.cogkit.org/license
-//This message may not be removed or altered.
-//----------------------------------------------------------------------
-
-/*
- * Created on Jul 21, 2006
- */
-package org.globus.cog.karajan.workflow.service.channels;
-
-import java.io.EOFException;
-import java.net.Socket;
-
-import org.globus.cog.karajan.workflow.service.RequestManager;
-
-public abstract class AbstractSocketChannel extends AbstractStreamKarajanChannel implements Runnable {
- private Socket socket;
- private boolean started;
- private Exception startException;
- private final boolean client;
- private boolean closing;
-
- public AbstractSocketChannel(RequestManager requestManager, ChannelContext channelContext,
- Socket socket, boolean client) {
- super(requestManager, channelContext);
- this.socket = socket;
- this.client = client;
- if (client) {
- setEndpoint("C(" + socket.getLocalAddress() + ")");
- }
- else {
- setEndpoint("S(" + socket.getLocalAddress() + ")");
- }
- }
-
- public synchronized void start() throws Exception {
- Thread thread = new Thread(this);
- thread.setDaemon(true);
- thread.setName("Chanel: " + getEndpoint());
- thread.start();
- while (!isStarted() && !isClosed() && startException == null) {
- try {
- wait();
- }
- catch (InterruptedException e) {
- }
- }
- if (startException != null) {
- logger.debug("Exception while starting channel", startException);
- throw startException;
- }
- logger.info(getEndpoint() + "Channel started");
- }
-
- public void run() {
- ChannelContext context = getChannelContext();
- try {
- try {
- setInputStream(socket.getInputStream());
- setOutputStream(socket.getOutputStream());
- started = true;
- }
- catch (Exception e) {
- startException = e;
- e.printStackTrace();
- return;
- }
- finally {
- synchronized (this) {
- notifyAll();
- }
- }
- initializeConnection();
- mainLoop();
- }
- catch (EOFException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Channel terminated", e);
- }
- context.notifyRegisteredListeners(e);
- }
- catch (Exception e) {
- if (!closing) {
- logger.warn("Exception in channel", e);
- context.notifyRegisteredListeners(e);
- }
- }
- finally {
- try {
- setLocalShutdown();
- ChannelManager.getManager().shutdownChannel(this);
- }
- catch (ShuttingDownException e) {
- logger.debug("Channel already shutting down");
- }
- catch (Exception e) {
- logger.warn(getEndpoint() + "Could not shutdown channel", e);
- }
- super.close();
- synchronized (this) {
- notify();
- }
- logger.info(getEndpoint() + "Channel terminated");
- }
- }
-
- protected void initializeConnection() {
-
- }
-
- public void close() {
- closing = true;
- try {
- if (!socket.isClosed()) {
- socket.close();
- logger.info(getEndpoint() + "Channel shut down");
- }
- }
- catch (Exception e) {
- logger.warn(getEndpoint() + "Failed to close socket", e);
- }
- super.close();
- }
-
- public boolean isClient() {
- return client;
- }
-
- public boolean isStarted() {
- return started;
- }
-
- public boolean isOffline() {
- return isClosed();
- }
-}
Modified: trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java
===================================================================
--- trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-04-22 13:57:26 UTC (rev 1965)
+++ trunk/current/src/cog/modules/karajan/src/org/globus/cog/karajan/workflow/service/channels/AbstractStreamKarajanChannel.java 2008-04-22 14:00:10 UTC (rev 1966)
@@ -12,31 +12,39 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
import org.apache.log4j.Logger;
-import org.globus.cog.karajan.workflow.service.NoSuchHandlerException;
-import org.globus.cog.karajan.workflow.service.ProtocolException;
import org.globus.cog.karajan.workflow.service.RequestManager;
-import org.globus.cog.karajan.workflow.service.commands.Command;
-import org.globus.cog.karajan.workflow.service.handlers.RequestHandler;
public abstract class AbstractStreamKarajanChannel extends AbstractKarajanChannel {
- public static final Logger logger = Logger.getLogger(AbstractStreamKarajanChannel.class);
-
+ public static final Logger logger = Logger.getLogger(AbstractStreamKarajanChannel.class);
+
+ public static final int STATE_IDLE = 0;
+ public static final int STATE_RECEIVING_DATA = 1;
+
+ public static final int HEADER_LEN = 12;
+
private InputStream inputStream;
private OutputStream outputStream;
- private String endpoint;
- private final ByteBuffer header;
- private final byte[] bheader;
-
- protected AbstractStreamKarajanChannel(RequestManager requestManager, ChannelContext channelContext) {
+ private URI contact;
+ private final byte[] rhdr;
+ private byte[] data;
+ private int dataPointer;
+ private int state, tag, flags, len;
+
+ protected AbstractStreamKarajanChannel(RequestManager requestManager,
+ ChannelContext channelContext) {
super(requestManager, channelContext);
- bheader = new byte[12];
- header = ByteBuffer.wrap(bheader);
+ rhdr = new byte[HEADER_LEN];
}
-
+
protected InputStream getInputStream() {
return inputStream;
}
@@ -53,146 +61,227 @@
this.outputStream = outputStream;
}
- public String getEndpoint() {
- return endpoint;
+ public URI getContact() {
+ return contact;
}
- public void setEndpoint(String endpoint) {
- this.endpoint = endpoint;
+ public void setContact(URI contact) {
+ this.contact = contact;
}
public synchronized void sendTaggedData(int tag, int flags, byte[] data) {
- header.clear();
- header.putInt(tag);
- header.putInt(flags);
- header.putInt(data.length);
- try {
- outputStream.write(bheader);
- outputStream.write(data);
- outputStream.flush();
+ getSender().enqueue(tag, flags, data, this);
+ }
+
+ protected boolean step() throws IOException {
+ int avail = inputStream.available();
+ if (avail == 0) {
+ return false;
}
- catch (IOException e) {
- throw new ChannelIOException(e);
+ if (state == STATE_IDLE && avail >= HEADER_LEN) {
+ readFromStream(inputStream, rhdr, 0);
+ tag = unpack(rhdr, 0);
+ flags = unpack(rhdr, 4);
+ len = unpack(rhdr, 8);
+ if (len > 20000) {
+ System.out.println("Big len: " + len);
+ }
+ data = new byte[len];
+ dataPointer = 0;
+ state = STATE_RECEIVING_DATA;
}
- }
-
-
- protected void mainLoop() throws IOException {
- ChannelContext context = getChannelContext();
- ByteBuffer header = ByteBuffer.allocate(12);
- while (!isClosed()) {
- header.clear();
- while (header.remaining() > 0) {
- readFromStream(inputStream, header);
- }
- header.rewind();
- IntBuffer iheader = header.asIntBuffer();
- int tag = iheader.get();
- int flags = iheader.get();
- int len = iheader.get();
- byte[] data = new byte[len];
- int crt = 0;
- while (crt < len) {
- crt = readFromStream(inputStream, data, crt);
- }
+ if (state == STATE_RECEIVING_DATA) {
+ while (avail > 0 && dataPointer < len) {
+ dataPointer += inputStream.read(data, dataPointer, Math.min(avail, len
+ - dataPointer));
+ avail = inputStream.available();
+ }
+ if (dataPointer == len) {
+ state = STATE_IDLE;
boolean fin = (flags & FINAL_FLAG) != 0;
boolean error = (flags & ERROR_FLAG) != 0;
if ((flags & REPLY_FLAG) != 0) {
// reply
- if (logger.isDebugEnabled()) {
- logger.debug(this + "REPL<: tag = " + tag + ", fin = " + fin + ", err = "
- + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
- }
- Command cmd = context.getRegisteredCommand(tag);
-
- if (cmd != null) {
- cmd.replyReceived(data);
- if (fin) {
- if (error) {
- cmd.errorReceived();
- }
- else {
- cmd.receiveCompleted();
- }
- unregisterCommand(cmd);
- }
- }
- else {
- logger.warn(endpoint + "Recieved reply to unregistered sender. Tag: " + tag);
- }
+ handleReply(tag, fin, error, len, data);
}
else {
// request
- if (logger.isDebugEnabled()) {
- logger.debug(this + "REQ<: tag = " + tag + ", fin = " + fin + ", err = "
- + error + ", datalen = " + len + ", data = " + ppByteBuf(data));
+ handleRequest(tag, fin, error, len, data);
+ }
+ }
+ }
+ return true;
+ }
+
+ protected void register() {
+ getMultiplexer(FAST).register(this);
+ }
+
+ private static final int SENDER_COUNT = 1;
+ private static Sender[] sender;
+ private static int crtSender;
+
+ private static synchronized Sender getSender() {
+ if (sender == null) {
+ sender = new Sender[SENDER_COUNT];
+ for (int i = 0; i < SENDER_COUNT; i++) {
+ sender[i] = new Sender();
+ sender[i].start();
+ }
+ }
+ try {
+ return sender[crtSender++];
+ }
+ finally {
+ if (crtSender == SENDER_COUNT) {
+ crtSender = 0;
+ }
+ }
+ }
+
+ private static class SendEntry {
+ public final int tag, flags;
+ public final byte[] data;
+ public final AbstractStreamKarajanChannel channel;
+
+ public SendEntry(int tag, int flags, byte[] data, AbstractStreamKarajanChannel channel) {
+ this.tag = tag;
+ this.flags = flags;
+ this.data = data;
+ this.channel = channel;
+ }
+ }
+
+ private static class Sender extends Thread {
+ private final LinkedList queue;
+ private final byte[] shdr;
+
+ public Sender() {
+ super("Sender");
+ queue = new LinkedList();
+ setDaemon(true);
+ shdr = new byte[HEADER_LEN];
+ }
+
+ public synchronized void enqueue(int tag, int flags, byte[] data, AbstractStreamKarajanChannel channel) {
+ queue.addLast(new SendEntry(tag, flags, data, channel));
+ notify();
+ }
+
+ public void run() {
+ try {
+ SendEntry e;
+ while (true) {
+ synchronized (this) {
+ while (queue.isEmpty()) {
+ wait();
+ }
+ e = (SendEntry) queue.removeFirst();
}
- RequestHandler handler = context.getRegisteredHandler(tag);
try {
- if (handler != null) {
- handler.register(this);
- handler.dataReceived(data);
- if (fin) {
- try {
- handler.receiveCompleted();
- }
- catch (ChannelIOException e) {
- throw e;
- }
- catch (Exception e) {
- if (!handler.isReplySent()) {
- handler.sendError(e.toString(), e);
- }
- }
- catch (Error e) {
- if (!handler.isReplySent()) {
- handler.sendError(e.toString(), e);
- }
- throw e;
- }
- finally {
- unregisterHandler(tag);
- }
- }
+ send(e.tag, e.flags, e.data, e.channel.getOutputStream());
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ try {
+ e.channel.getChannelContext().getRegisteredCommand(e.tag).errorReceived(null, ex);
+ }
+ catch (Exception exx) {
+ logger.warn(exx);
+ }
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // exit
+ }
+ }
+
+ private void send(int tag, int flags, byte[] data, OutputStream os) throws IOException {
+ pack(shdr, 0, tag);
+ pack(shdr, 4, flags);
+ pack(shdr, 8, data.length);
+ synchronized(os) {
+ os.write(shdr);
+ os.write(data);
+ os.flush();
+ }
+ }
+ }
+
+ private static final int MUX_COUNT = 2;
+ private static Multiplexer[] multiplexer;
+ public static final int FAST = 0;
+ public static final int SLOW = 1;
+
+...
[truncated message content] |