Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server
In directory sc8-pr-cvs1:/tmp/cvs-serv27783/src/com/ubermq/jms/server
Modified Files:
MessageServer.java
Log Message:
various stability improvements, including better SSL cxn loss detection
Index: MessageServer.java
===================================================================
RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/MessageServer.java,v
retrieving revision 1.44
retrieving revision 1.45
diff -C2 -d -r1.44 -r1.45
*** MessageServer.java 1 Jul 2003 14:11:51 -0000 1.44
--- MessageServer.java 13 Sep 2003 03:50:52 -0000 1.45
***************
*** 43,59 ****
{
private boolean started = false;
!
// clustering
private ClusterPropagation[] clusterCxn = new ClusterPropagation[CLUSTER_MAX_CXNS];
private int clusterCxnCount = 0;
!
// datagram processing & protocls
private IMessageProcessor datagramProcessor;
private Set protocols = new LinkedHashSet();
!
// connection and I/O management
private ConnectionList[] readLists, writeLists;
private ReadWriteTransformThread[] read, write;
!
public static final int DEFAULT_PORT = 3999;
private static final int CLUSTER_MAX_CXNS = 2;
--- 43,59 ----
{
private boolean started = false;
!
// clustering
private ClusterPropagation[] clusterCxn = new ClusterPropagation[CLUSTER_MAX_CXNS];
private int clusterCxnCount = 0;
!
// datagram processing & protocls
private IMessageProcessor datagramProcessor;
private Set protocols = new LinkedHashSet();
!
// connection and I/O management
private ConnectionList[] readLists, writeLists;
private ReadWriteTransformThread[] read, write;
!
public static final int DEFAULT_PORT = 3999;
private static final int CLUSTER_MAX_CXNS = 2;
***************
*** 64,68 ****
Configurator.getProperty(ServerConfig.DATAGRAM_FACTORY_CLASS, ServerDatagramFactory.class.getName());
private static final String DATAGRAM_INSTANCE_METHOD = "getInstance";
!
/**
* A pluggable protocol handler that can be attached to a message server
--- 64,68 ----
Configurator.getProperty(ServerConfig.DATAGRAM_FACTORY_CLASS, ServerDatagramFactory.class.getName());
private static final String DATAGRAM_INSTANCE_METHOD = "getInstance";
!
/**
* A pluggable protocol handler that can be attached to a message server
***************
*** 82,86 ****
*/
public URI getServiceURI();
!
/**
* Indicates if the protocol is enabled.
--- 82,86 ----
*/
public URI getServiceURI();
!
/**
* Indicates if the protocol is enabled.
***************
*** 89,93 ****
*/
public boolean isEnabled();
!
/**
* Starts the protocol. This method should
--- 89,93 ----
*/
public boolean isEnabled();
!
/**
* Starts the protocol. This method should
***************
*** 101,105 ****
public void start(IMessageProcessor dp, IConnectionInfo.ConnectionAcceptor a)
throws IOException;
!
/**
* Stops the protocol. If the protocol has not been started,
--- 101,105 ----
public void start(IMessageProcessor dp, IConnectionInfo.ConnectionAcceptor a)
throws IOException;
!
/**
* Stops the protocol. If the protocol has not been started,
***************
*** 108,112 ****
public void stop();
}
!
/**
* Resets the list of protocols to an empty set.<P>
--- 108,112 ----
public void stop();
}
!
/**
* Resets the list of protocols to an empty set.<P>
***************
*** 118,122 ****
protocols.clear();
}
!
/**
* Adds a protocol to the message server. This method must only be
--- 118,122 ----
protocols.clear();
}
!
/**
* Adds a protocol to the message server. This method must only be
***************
*** 129,133 ****
protocols.add(p);
}
!
/**
* Adds the standard protocols. This includes the default UberMQ protocol,
--- 129,133 ----
protocols.add(p);
}
!
/**
* Adds the standard protocols. This includes the default UberMQ protocol,
***************
*** 145,154 ****
SSLProtocol.getProtocolPort()));
}
!
public Set getProtocols()
{
return protocols;
}
!
/**
* Executes the message server. This message server potentially creates an
--- 145,154 ----
SSLProtocol.getProtocolPort()));
}
!
public Set getProtocols()
{
return protocols;
}
!
/**
* Executes the message server. This message server potentially creates an
***************
*** 159,168 ****
{
URI serviceURI = null;
!
try
{
// open journal
ISettingsRepository fj = new BinarySettingsRepository();
!
// start read/write threads
readLists = new ConnectionList[RW_THREAD_COUNT];
--- 159,168 ----
{
URI serviceURI = null;
!
try
{
// open journal
ISettingsRepository fj = new BinarySettingsRepository();
!
// start read/write threads
readLists = new ConnectionList[RW_THREAD_COUNT];
***************
*** 175,192 ****
Selector readSelector = Selector.open(),
writeSelector = Selector.open();
!
readLists[i] = new ConnectionList(readSelector);
writeLists[i] = new ConnectionList(writeSelector);
!
read[i] = new ReadWriteTransformThread(readSelector, readLists[i], SelectionKey.OP_READ);
read[i].start();
!
write[i] = new ReadWriteTransformThread(writeSelector, writeLists[i], SelectionKey.OP_WRITE);
write[i].start();
}
!
// create single datagram processor
this.datagramProcessor = createDatagramProc(fj);
!
// go through registered protocols
Iterator iter = protocols.iterator();
--- 175,192 ----
Selector readSelector = Selector.open(),
writeSelector = Selector.open();
!
readLists[i] = new ConnectionList(readSelector);
writeLists[i] = new ConnectionList(writeSelector);
!
read[i] = new ReadWriteTransformThread(readSelector, readLists[i], SelectionKey.OP_READ);
read[i].start();
!
write[i] = new ReadWriteTransformThread(writeSelector, writeLists[i], SelectionKey.OP_WRITE);
write[i].start();
}
!
// create single datagram processor
this.datagramProcessor = createDatagramProc(fj);
!
// go through registered protocols
Iterator iter = protocols.iterator();
***************
*** 196,200 ****
if (p.isEnabled())
{
! p.start(datagramProcessor, new IConnectionInfo.ConnectionAcceptor() {
public void acceptIncomingConnection(IConnectionInfo incoming)
{
--- 196,201 ----
if (p.isEnabled())
{
! p.start(datagramProcessor, new IConnectionInfo.ConnectionAcceptor()
! {
public void acceptIncomingConnection(IConnectionInfo incoming)
{
***************
*** 205,214 ****
});
com.ubermq.Utility.getLogger().info("Protocol " + p.toString() + " started");
!
if (serviceURI == null)
serviceURI = p.getServiceURI();
}
}
!
// we're done.
started = true;
--- 206,215 ----
});
com.ubermq.Utility.getLogger().info("Protocol " + p.toString() + " started");
!
if (serviceURI == null)
serviceURI = p.getServiceURI();
}
}
!
// we're done.
started = true;
***************
*** 218,222 ****
x.printStackTrace();
}
!
// set server name and return URi
if (datagramProcessor instanceof DatagramProc)
--- 219,223 ----
x.printStackTrace();
}
!
// set server name and return URi
if (datagramProcessor instanceof DatagramProc)
***************
*** 226,230 ****
return serviceURI;
}
!
/**
* Stops the server. This shuts down all I/O threads asynchronously, and waits
--- 227,231 ----
return serviceURI;
}
!
/**
* Stops the server. This shuts down all I/O threads asynchronously, and waits
***************
*** 242,246 ****
write[i].interrupt();
}
!
// stop all protocols
Iterator iter = protocols.iterator();
--- 243,247 ----
write[i].interrupt();
}
!
// stop all protocols
Iterator iter = protocols.iterator();
***************
*** 251,255 ****
}
}
!
/**
* Processes clustering announcements by creating a forwarding connection
--- 252,256 ----
}
}
!
/**
* Processes clustering announcements by creating a forwarding connection
***************
*** 267,271 ****
}
}
!
protected IMessageProcessor createDatagramProc(ISettingsRepository settings)
throws IOException
--- 268,272 ----
}
}
!
protected IMessageProcessor createDatagramProc(ISettingsRepository settings)
throws IOException
***************
*** 274,278 ****
return dp;
}
!
/**
* Returns the server's datagram processor.
--- 275,279 ----
return dp;
}
!
/**
* Returns the server's datagram processor.
***************
*** 284,288 ****
return datagramProcessor;
}
!
/**
* Returns the datagram factory holder to be used by this
--- 285,289 ----
return datagramProcessor;
}
!
/**
* Returns the datagram factory holder to be used by this
***************
*** 304,308 ****
ServerDatagramFactory.getInstance());
}
!
/**
* Returns the datagram factory holder that clients should
--- 305,309 ----
ServerDatagramFactory.getInstance());
}
!
/**
* Returns the datagram factory holder that clients should
***************
*** 317,321 ****
return DatagramFactory.getHolder();
}
!
/**
* Pipes are first class constructs in this message server implementation.
--- 318,322 ----
return DatagramFactory.getHolder();
}
!
/**
* Pipes are first class constructs in this message server implementation.
***************
*** 344,352 ****
datagramProcessor);
}
!
! protected void doConnectPipes(Pipe upstream,
! Pipe downstream,
! IDatagramFactory df,
! IMessageProcessor dp)
throws IOException
{
--- 345,353 ----
datagramProcessor);
}
!
! protected IConnectionInfo doConnectPipes(Pipe upstream,
! Pipe downstream,
! IDatagramFactory df,
! IMessageProcessor dp)
throws IOException
{
***************
*** 359,364 ****
readLists[0].push(ci);
writeLists[0].push(ci);
}
!
/**
* A standard port based protocol implementation. Can be started on
--- 360,366 ----
readLists[0].push(ci);
writeLists[0].push(ci);
+ return ci;
}
!
/**
* A standard port based protocol implementation. Can be started on
***************
*** 373,381 ****
String.valueOf(DEFAULT_PORT))).intValue();
}
!
private IDatagramFactory factory;
private int port;
private AcceptThread acceptThread;
!
public DefaultProtocol(IDatagramFactory factory, int port)
{
--- 375,383 ----
String.valueOf(DEFAULT_PORT))).intValue();
}
!
private IDatagramFactory factory;
private int port;
private AcceptThread acceptThread;
!
public DefaultProtocol(IDatagramFactory factory, int port)
{
***************
*** 383,392 ****
this.port = port;
}
!
public boolean isEnabled()
{
return true;
}
!
public void start(IMessageProcessor datagramProcessor,
IConnectionInfo.ConnectionAcceptor a)
--- 385,394 ----
this.port = port;
}
!
public boolean isEnabled()
{
return true;
}
!
public void start(IMessageProcessor datagramProcessor,
IConnectionInfo.ConnectionAcceptor a)
***************
*** 408,412 ****
}
}
!
public void stop()
{
--- 410,414 ----
}
}
!
public void stop()
{
***************
*** 418,427 ****
catch (InterruptedException e) {}
}
!
public String toString()
{
return port == DEFAULT_PORT ? "UberMQ" : ("UberMQ (" + port + ")");
}
!
/**
* Returns the service URI for this protocol.
--- 420,429 ----
catch (InterruptedException e) {}
}
!
public String toString()
{
return port == DEFAULT_PORT ? "UberMQ" : ("UberMQ (" + port + ")");
}
!
/**
* Returns the service URI for this protocol.
***************
*** 444,451 ****
}
}
!
!
}
!
/**
* The administration protocol implementation.
--- 446,453 ----
}
}
!
!
}
!
/**
* The administration protocol implementation.
***************
*** 460,471 ****
private final String ADMIN_SERVICE_NAME =
Configurator.getProperty(ServerConfig.ADMIN_SERVICE_NAME, "UberMQAdmin");
!
private URI serviceURI;
!
public boolean isEnabled()
{
return Boolean.valueOf(Configurator.getProperty(ServerConfig.ADMIN_ENABLE, "false")).booleanValue();
}
!
public void start(IMessageProcessor mp,
IConnectionInfo.ConnectionAcceptor a)
--- 462,473 ----
private final String ADMIN_SERVICE_NAME =
Configurator.getProperty(ServerConfig.ADMIN_SERVICE_NAME, "UberMQAdmin");
!
private URI serviceURI;
!
public boolean isEnabled()
{
return Boolean.valueOf(Configurator.getProperty(ServerConfig.ADMIN_ENABLE, "false")).booleanValue();
}
!
public void start(IMessageProcessor mp,
IConnectionInfo.ConnectionAcceptor a)
***************
*** 476,483 ****
RemoteAdminProxy admin = new RemoteAdminProxy((MessageServerAdmin)mp);
LocateRegistry.createRegistry(ADMIN_PORT);
!
Registry registry = LocateRegistry.getRegistry(ADMIN_PORT);
registry.rebind(ADMIN_SERVICE_NAME, admin);
!
serviceURI = URI.create("//" + InetAddress.getLocalHost().getHostName() +
":" + ADMIN_PORT + "/" + ADMIN_SERVICE_NAME);
--- 478,485 ----
RemoteAdminProxy admin = new RemoteAdminProxy((MessageServerAdmin)mp);
LocateRegistry.createRegistry(ADMIN_PORT);
!
Registry registry = LocateRegistry.getRegistry(ADMIN_PORT);
registry.rebind(ADMIN_SERVICE_NAME, admin);
!
serviceURI = URI.create("//" + InetAddress.getLocalHost().getHostName() +
":" + ADMIN_PORT + "/" + ADMIN_SERVICE_NAME);
***************
*** 490,503 ****
}
}
!
public void stop()
{
}
!
public String toString()
{
return "UberMQAdmin";
}
!
/**
* Returns the service URI for this protocol.
--- 492,505 ----
}
}
!
public void stop()
{
}
!
public String toString()
{
return "UberMQAdmin";
}
!
/**
* Returns the service URI for this protocol.
***************
*** 510,527 ****
return serviceURI;
}
!
!
}
!
public MessageServer(String[] args)
{
super(args.length > 0 ? args[0] : null);
}
!
public MessageServer(Properties props)
{
super(props);
}
!
/**
* Runs the UberMQ JMS server.<P>
--- 512,529 ----
return serviceURI;
}
!
!
}
!
public MessageServer(String[] args)
{
super(args.length > 0 ? args[0] : null);
}
!
public MessageServer(Properties props)
{
super(props);
}
!
/**
* Runs the UberMQ JMS server.<P>
***************
*** 538,547 ****
final MessageServer s = new MessageServer(args);
s.addStandardProtocols();
!
// run it
s.run();
com.ubermq.Utility.getLogger().info("UberMQ " + com.ubermq.jms.client.impl.Connection.UBERMQ_PROVIDER_VERSION + " running at " + s.getServiceUrl());
}
!
/**
* Returns the datagram factory to be used to interpret byte streams
--- 540,549 ----
final MessageServer s = new MessageServer(args);
s.addStandardProtocols();
!
// run it
s.run();
com.ubermq.Utility.getLogger().info("UberMQ " + com.ubermq.jms.client.impl.Connection.UBERMQ_PROVIDER_VERSION + " running at " + s.getServiceUrl());
}
!
/**
* Returns the datagram factory to be used to interpret byte streams
***************
*** 583,587 ****
}
}
!
return DatagramFactory.getInstance();
}
--- 585,589 ----
}
}
!
return DatagramFactory.getInstance();
}
|