[Ubermq-commits] jms/src/com/ubermq/jms/server MessageServer.java,1.12,1.13
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-09-20 15:52:26
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server In directory usw-pr-cvs1:/tmp/cvs-serv24923/src/com/ubermq/jms/server Modified Files: MessageServer.java Log Message: some slight refactorings, an Ack datagram factory, plus some new regression stuff. Index: MessageServer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/MessageServer.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** MessageServer.java 16 Sep 2002 01:43:34 -0000 1.12 --- MessageServer.java 20 Sep 2002 15:52:22 -0000 1.13 *************** *** 1,151 **** ! package com.ubermq.jms.server; ! ! import com.ubermq.kernel.*; ! ! import com.ubermq.jms.server.cluster.ClusterPropagation; ! import com.ubermq.jms.server.datagram.impl.DatagramFactory; ! import com.ubermq.jms.server.journal.impl.FileJournal; ! import com.ubermq.jms.server.proc.DatagramProc; ! import java.nio.channels.Selector; ! import java.lang.reflect.*; ! ! /** ! * An executable class that provides typical message server ! * functionality for TCP connections. This includes ! * listening on a port, accepting new connections, ! * processing I/O for those connections, determining ! * which connections should receive messages, outputting ! * messages to receipient connections, managing unacknowledged ! * messages, providing durable subscriber and message selector ! * functionality, and managing overflow conditions. ! * <P> ! * A message server hub-and-spoke topology is very common ! * and applicable for many enterprise messaging ! * problems. ! */ ! public class MessageServer ! extends ClusteredServer ! { ! /** ! * Specifies the default UberMQ TCP port of 3999. ! */ ! public static final int DEFAULT_PORT = 3999; ! ! private boolean started = false; ! private ClusterPropagation[] clusterCxn = new ClusterPropagation[CLUSTER_MAX_CXNS]; ! private int clusterCxnCount = 0; ! ! private static final int CLUSTER_MAX_CXNS = 2; ! private static final String ALL_TOPICS = "#"; ! private static final int RW_THREAD_COUNT = ! Integer.valueOf(Configurator.getProperty(ServerConfig.RW_THREAD_COUNT, "1")).intValue(); ! private static final String DATAGRAM_FACTORY_CLASS = ! Configurator.getProperty(ServerConfig.DATAGRAM_FACTORY_CLASS, DatagramFactory.class.getName()); ! private static final String DATAGRAM_INSTANCE_METHOD = "getInstance"; ! ! public void exec() ! { ! try { ! Selector acceptSelector = Selector.open(); ! ! // open journal ! FileJournal fj = new FileJournal(); ! ! // start threads ! ConnectionList[] lists = new ConnectionList[RW_THREAD_COUNT]; ! com.ubermq.Utility.getLogger().fine("Creating read/write pool of size " + RW_THREAD_COUNT); ! for (int i = 0; i < RW_THREAD_COUNT; i++) ! { ! Selector readSelector = Selector.open(); ! ! ConnectionList cxns = new ConnectionList(readSelector); ! lists[i] = cxns; ! ! ReadWriteTransformThread rwtt = ! new ReadWriteTransformThread(readSelector, ! cxns); ! rwtt.start(); ! } ! ! new AcceptThread(acceptSelector, ! lists, ! Integer.valueOf(Configurator.getProperty(ConfigConstants.SERVER_PORT, ! String.valueOf(DEFAULT_PORT))).intValue(), ! getDatagramFactory(), ! new DatagramProc(fj, getDatagramFactory())).start(); ! ! started = true; ! } ! catch(Exception x) { ! com.ubermq.Utility.getLogger().throwing("", "", x); ! } ! } ! ! /** ! * Can be overridden by subclasses to provide a different wire protocol ! */ ! protected IDatagramFactory getDatagramFactory() ! { ! Class clazz = null; ! try ! { ! Class.forName(DATAGRAM_FACTORY_CLASS); ! return (IDatagramFactory)clazz.newInstance(); ! } ! catch (ClassNotFoundException e) { ! // fall through ! } ! catch (ClassCastException e) { ! // fall through ! } ! catch (Exception e) { ! try ! { ! Method getInstance = clazz.getMethod(DATAGRAM_INSTANCE_METHOD, null); ! return (IDatagramFactory)getInstance.invoke(null, null); ! } ! catch (Exception e2) { ! // fall through ! } ! } ! ! return DatagramFactory.getInstance(); ! } ! ! protected void onClusterAnnouncement(String url) ! { ! com.ubermq.Utility.getLogger().fine("cluster notification for " + url); ! if (started && ! clusterCxnCount < CLUSTER_MAX_CXNS) ! { ! // we'll try to actually connect to the other guy. ! clusterCxn[clusterCxnCount++] = ! new ClusterPropagation(getServiceUrl(), ! url); ! } ! } ! ! public MessageServer(String[] args) ! { ! super(args.length > 0 ? args[0] : null); ! } ! ! /** ! * Runs the UberMQ JMS server.<br> ! * 1. attempts to make an entry into a high availability cluster.<br> ! * 2. recovers from a checkpointed state.<br> ! * 3. performs I/O.<P> ! * The I/O Theory is that we read in information ! * i.e. messages that are then Transformed ! * into outgoing destination lists ! * or control metadata. ! */ ! public static void main(String[] args) ! { ! KernelBasedServer s = new MessageServer(args); ! s.run(); ! com.ubermq.Utility.getLogger().info("UberMQ 0.2 Running at " + s.getServiceUrl()); ! } ! ! ! } --- 1,154 ---- ! package com.ubermq.jms.server; ! ! import com.ubermq.kernel.*; ! ! import com.ubermq.jms.server.cluster.ClusterPropagation; ! import com.ubermq.jms.server.datagram.IAckDatagramFactory; ! import com.ubermq.jms.server.datagram.impl.DatagramFactory; ! import com.ubermq.jms.server.journal.impl.FileJournal; ! import com.ubermq.jms.server.proc.DatagramProc; ! import java.lang.reflect.Method; ! import java.nio.channels.Selector; ! ! /** ! * An executable class that provides typical message server ! * functionality for TCP connections. This includes ! * listening on a port, accepting new connections, ! * processing I/O for those connections, determining ! * which connections should receive messages, outputting ! * messages to receipient connections, managing unacknowledged ! * messages, providing durable subscriber and message selector ! * functionality, and managing overflow conditions. ! * <P> ! * A message server hub-and-spoke topology is very common ! * and applicable for many enterprise messaging ! * problems. ! */ ! public class MessageServer ! extends ClusteredServer ! { ! /** ! * Specifies the default UberMQ TCP port of 3999. ! */ ! public static final int DEFAULT_PORT = 3999; ! ! private boolean started = false; ! private ClusterPropagation[] clusterCxn = new ClusterPropagation[CLUSTER_MAX_CXNS]; ! private int clusterCxnCount = 0; ! ! private static final int CLUSTER_MAX_CXNS = 2; ! private static final String ALL_TOPICS = "#"; ! private static final int RW_THREAD_COUNT = ! Integer.valueOf(Configurator.getProperty(ServerConfig.RW_THREAD_COUNT, "1")).intValue(); ! private static final String DATAGRAM_FACTORY_CLASS = ! Configurator.getProperty(ServerConfig.DATAGRAM_FACTORY_CLASS, DatagramFactory.class.getName()); ! private static final String DATAGRAM_INSTANCE_METHOD = "getInstance"; ! ! public void exec() ! { ! try { ! Selector acceptSelector = Selector.open(); ! ! // open journal ! FileJournal fj = new FileJournal(); ! ! // start threads ! ConnectionList[] lists = new ConnectionList[RW_THREAD_COUNT]; ! com.ubermq.Utility.getLogger().fine("Creating read/write pool of size " + RW_THREAD_COUNT); ! for (int i = 0; i < RW_THREAD_COUNT; i++) ! { ! Selector readSelector = Selector.open(); ! ! ConnectionList cxns = new ConnectionList(readSelector); ! lists[i] = cxns; ! ! ReadWriteTransformThread rwtt = ! new ReadWriteTransformThread(readSelector, ! cxns); ! rwtt.start(); ! } ! ! new AcceptThread(acceptSelector, ! lists, ! Integer.valueOf(Configurator.getProperty(ConfigConstants.SERVER_PORT, ! String.valueOf(DEFAULT_PORT))).intValue(), ! getDatagramFactory(), ! new DatagramProc(fj, ! getDatagramFactory(), ! (IAckDatagramFactory)getDatagramFactory())).start(); ! ! started = true; ! } ! catch(Exception x) { ! com.ubermq.Utility.getLogger().throwing("", "", x); ! } ! } ! ! /** ! * Can be overridden by subclasses to provide a different wire protocol ! */ ! protected IDatagramFactory getDatagramFactory() ! { ! Class clazz = null; ! try ! { ! Class.forName(DATAGRAM_FACTORY_CLASS); ! return (IDatagramFactory)clazz.newInstance(); ! } ! catch (ClassNotFoundException e) { ! // fall through ! } ! catch (ClassCastException e) { ! // fall through ! } ! catch (Exception e) { ! try ! { ! Method getInstance = clazz.getMethod(DATAGRAM_INSTANCE_METHOD, null); ! return (IDatagramFactory)getInstance.invoke(null, null); ! } ! catch (Exception e2) { ! // fall through ! } ! } ! ! return DatagramFactory.getInstance(); ! } ! ! protected void onClusterAnnouncement(String url) ! { ! com.ubermq.Utility.getLogger().fine("cluster notification for " + url); ! if (started && ! clusterCxnCount < CLUSTER_MAX_CXNS) ! { ! // we'll try to actually connect to the other guy. ! clusterCxn[clusterCxnCount++] = ! new ClusterPropagation(getServiceUrl(), ! url); ! } ! } ! ! public MessageServer(String[] args) ! { ! super(args.length > 0 ? args[0] : null); ! } ! ! /** ! * Runs the UberMQ JMS server.<br> ! * 1. attempts to make an entry into a high availability cluster.<br> ! * 2. recovers from a checkpointed state.<br> ! * 3. performs I/O.<P> ! * The I/O Theory is that we read in information ! * i.e. messages that are then Transformed ! * into outgoing destination lists ! * or control metadata. ! */ ! public static void main(String[] args) ! { ! KernelBasedServer s = new MessageServer(args); ! s.run(); ! com.ubermq.Utility.getLogger().info("UberMQ 0.2 Running at " + s.getServiceUrl()); ! } ! ! ! } |