User: ovidiuf Date: 04/04/15 15:54:20 Added: src/main/org/jboss/jms/serverless ChannelState.java ConnectionState.java Destinations.java GroupConnection.java GroupConnectionFactory.java GroupQueue.java GroupState.java GroupTopic.java MessageConsumerImpl.java MessageImpl.java MessageProducerImpl.java NotImplementedException.java ProviderException.java QueueCarrier.java QueueReceiverAddress.java QueueReceiverImpl.java QueueSenderImpl.java ServerAdminCommand.java SessionImpl.java SessionManager.java TextMessageImpl.java TopicPublisherImpl.java TopicSubscriberImpl.java Log: initial import of the Serverless JMS prototype formerly known as SLJMS 0.1.2 Revision Changes Path 1.1 jboss-jms/src/main/org/jboss/jms/serverless/ChannelState.java Index: ChannelState.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; /** * An object whose lock is used to control the Connection Management Thread. Has a binary state * (open/not open). * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class ChannelState { private static final Logger log = Logger.getLogger(ChannelState.class); private boolean open; public ChannelState() { open = false; } public synchronized boolean isOpen() { return open; } public synchronized boolean isNotOpen() { return !open; } public synchronized void setOpen(boolean b) { open = b; } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/ConnectionState.java Index: ConnectionState.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; /** * An object that holds the current state of a connection. The state could be one, and only one of * DISCONNECTED, STOPPED, STARTED, CLOSED. The instance's lock is used during the operations that * change the connection state. * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class ConnectionState { private static final Logger log = Logger.getLogger(ConnectionState.class); public static final int DISCONNECTED = 0; public static final int STOPPED = 1; public static final int STARTED = 2; public static final int CLOSED = 3; private int state; public ConnectionState() { state = DISCONNECTED; } public synchronized boolean isDisconnected() { return state == DISCONNECTED; } public synchronized boolean isStopped() { return state == STOPPED; } public synchronized boolean isStarted() { return state == STARTED; } public synchronized boolean isClosed() { return state == CLOSED; } // No state consistency check is performed at this level. State changing methods should do // that and throw apropriate exceptions. public synchronized void setStopped() { state = STOPPED; } public synchronized void setStarted() { state = STARTED; } public synchronized void setClosed() { state = CLOSED; } public static String stateToString(ConnectionState cs) { return cs.state == DISCONNECTED ? "DISCONNECTED" : cs.state == STOPPED ? "STOPPED" : cs.state == STARTED ? "STARTED" : cs.state == CLOSED ? "CLOSED" : "UNKNOWN"; } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/Destinations.java Index: Destinations.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.Topic; import javax.jms.JMSException; import javax.jms.Destination; /** * Collection of utilites to parse Destination names and generate Destination instances. * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class Destinations { private Destinations() { } /** * The method expects to get the string representation of a GroupTopic or GroupQueue and * attempts to parse it and create the corresponding destination instance. A parsing error * generates a JMSException. * * @param s - the string representation of a Destination. * * TO_DO: doesn't handle null names consistently **/ public static Destination createDestination(String s) throws JMSException { // TO_DO: add test cases if (s == null) { throw new JMSException("null destination string representation"); } if (s.startsWith("GroupTopic[") && s.endsWith("]")) { String name = s.substring("GroupTopic[".length(), s.length() - 1); return new GroupTopic(name); } else if (s.startsWith("GroupQueue[") && s.endsWith("]")) { String name = s.substring("GroupQueue[".length(), s.length() - 1); return new GroupQueue(name); } throw new JMSException("invalid destination string representation: "+s); } /** * TO_DO: doesn't handle null names consistently * @exception JMSException - if handling the destination throws exception. **/ public static String stringRepresentation(Destination d) throws JMSException { if (d instanceof GroupTopic) { String name = ((GroupTopic)d).getTopicName(); return "GroupTopic["+name+"]"; } else if (d instanceof GroupQueue) { String name = ((GroupQueue)d).getQueueName(); return "GroupQueue["+name+"]"; } throw new JMSException("Unsupported destination: "+d); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/GroupConnection.java Index: GroupConnection.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.ConnectionMetaData; import javax.jms.ExceptionListener; import javax.jms.ConnectionConsumer; import javax.jms.ServerSessionPool; import javax.jms.Destination; import javax.jms.Topic; import javax.jms.Session; import org.jgroups.JChannel; import org.jgroups.ChannelListener; import org.jgroups.Channel; import org.jgroups.Address; import org.jgroups.ChannelException; import java.io.Serializable; import java.net.URL; import javax.jms.Queue; import org.jgroups.SetStateEvent; import org.jgroups.util.Util; import org.jgroups.GetStateEvent; import org.jgroups.View; import org.jgroups.SuspectEvent; import org.jgroups.ChannelClosedException; import org.jgroups.ChannelNotConnectedException; /** * The main piece of the JMS client runtime. Sits in top of a JChannel and mainains the "server * group" state. Delegates the session management to the SessionManager instance. Deals with * message delivery to and from sessions. Implements the Connection interface. * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ class GroupConnection implements Connection, Runnable { private static final Logger log = Logger.getLogger(GroupConnection.class); private static final String DEFAULT_SERVER_GROUP_NAME = "serverGroup"; private URL serverChannelConfigURL; private SessionManager sessionManager; private org.jgroups.util.Queue deliveryQueue; private ConnectionState connState; // private ChannelState channelState; private GroupState groupState; private Thread connManagementThread; private JChannel serverChannel; /** * The constructor leaves the Connection in a DISCONNECTED state. * * @param serverChannelConfigURL the URL of the XML file containing channel configuration. **/ GroupConnection(URL serverChannelConfigURL) { this.serverChannelConfigURL = serverChannelConfigURL; deliveryQueue = new org.jgroups.util.Queue(); sessionManager = new SessionManager(this, deliveryQueue); groupState = new GroupState(); connManagementThread = new Thread(this, "Connection Management Thread"); connState = new ConnectionState(); } /** * Initalizes the connection, by connecting the channel to the server group. Should be called * only once, when the Connection instance is created. **/ void connect() throws JMSException { // TO_DO: if is already connected (stopped), just return try { serverChannel = new JChannel(serverChannelConfigURL); serverChannel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); serverChannel.setChannelListener(new ChannelListener() { public void channelClosed(Channel channel) { log.debug("channelClosed("+channel+")"); } public void channelConnected(Channel channel) { log.debug("channelConnected() to group ["+ channel.getChannelName()+"]"); } public void channelDisconnected(Channel channel) { log.debug("channelDisconnected("+channel+")"); } public void channelReconnected(Address addr) { log.debug("channelReconnected("+addr+")"); } public void channelShunned() { log.debug("channelShunned()"); } }); log.debug("channel created"); serverChannel.connect(DEFAULT_SERVER_GROUP_NAME); log.debug("channel connected"); connState.setStopped(); connManagementThread.start(); log.debug("Connection Management Thread started"); boolean getStateOK = serverChannel.getState(null, 0); log.debug("getState(): "+getStateOK); } catch(ChannelException e) { String msg = "Failed to create an active connection"; log.error(msg, e); JMSException jmse = new JMSException(msg); jmse.setLinkedException(e); throw jmse; } } // TO_DO: deal with situation when this method is accessed concurrently from different threads void send(javax.jms.Message m) throws JMSException { try { // the Destination is already set for the message if (m.getJMSDestination() instanceof Topic) { // for topics, multicast serverChannel.send(null, null, (Serializable)m); } else { // for queues, unicast to the coordinator // TO_DO: optimization, if I am the only on in group, don't send the messages // down the stack anymore org.jgroups.Message jgmsg = new org.jgroups.Message((Address)serverChannel.getView().getMembers().get(0), null, new QueueCarrier(m)); serverChannel.send(jgmsg); } } catch(Exception e) { String msg = "Failed to send message"; log.error(msg, e); JMSException jmse = new JMSException(msg); jmse.setLinkedException(e); throw jmse; } } // // Runnable INTERFACE IMPLEMENTATION // /** * Code executed on the Connection Management Thread thread. It synchronously pulls JG * message and events from the channel. **/ public void run() { Object incoming = null; while(true) { try { incoming = serverChannel.receive(0); } catch(ChannelClosedException e) { log.debug("Channel closed, exiting"); break; } catch(ChannelNotConnectedException e) { log.warn("TO_DO: Channel not connected, I should block the thread ..."); continue; } catch(Exception e) { // TO_DO: use a JMS ExceptionListener and do some other things as well .... log.error("Failed to synchronously read from the channel", e); } try { dispatch(incoming); } catch(Exception e) { // TO_DO: I don't want that poorly written client code (dispatch() ends running // MessageListener code) to throw RuntimeException and terminate this thread // use the ExceptionListener and do some other things as well .... log.error("Dispatching failed", e); } } } // // // private void dispatch(Object o) throws Exception { log.debug("dispatching "+o); if (o instanceof SetStateEvent) { byte[] buffer = ((SetStateEvent)o).getArg(); if (buffer == null) { // that's ok if I am the coordinator, just ignore it log.debug("null group state, ignoring ..."); } else { // update my group state groupState.fromByteBuffer(buffer); } return; } else if (o instanceof GetStateEvent) { // somebody is requesting the group state serverChannel.returnState(groupState.toByteBuffer()); return; } else if (o instanceof View) { // no use for it for the time being return; } else if (o instanceof SuspectEvent) { // no use for it for the time being return; } else if (!(o instanceof org.jgroups.Message)) { // ignore it for the time being log.warn("Ignoring "+o); return; } org.jgroups.Message jgmsg = (org.jgroups.Message)o; Object payload = jgmsg.getObject(); if (payload instanceof ServerAdminCommand) { // ADD_QUEUE_RECEIVER, aso handleServerAdminCommand(jgmsg.getSrc(), (ServerAdminCommand)payload); } else if (payload instanceof QueueCarrier) { QueueCarrier qc = (QueueCarrier)payload; String sessionID = qc.getSessionID(); // this is either an initial queue carrier that forwards the message from its // source to the coordinator, or a final queue carrier that forwards the message // from the coordinator to its final destination. if (sessionID == null) { queueForward(qc); } else { deliveryQueue.add(qc); } } else if (payload instanceof javax.jms.Message) { // deliver only if the connection is started, discard otherwise if (connState.isStarted()) { deliveryQueue.add((javax.jms.Message)payload); } } else { log.warn("JG Message with a payload something else than a JMS Message: "+ (payload == null ? "null" : payload.getClass().getName())); } } private void handleServerAdminCommand(Address src, ServerAdminCommand c) { //log.debug("Handling "+c.getCommand()); String comm = c.getCommand(); if (ServerAdminCommand.ADD_QUEUE_RECEIVER.equals(comm)) { String queueName = (String)c.get(0); String sessionID = (String)c.get(1); String queueReceiverID = (String)c.get(2); groupState.addQueueReceiver(queueName, src, sessionID, queueReceiverID); } else if (ServerAdminCommand.REMOVE_QUEUE_RECEIVER.equals(comm)) { String queueName = (String)c.get(0); String sessionID = (String)c.get(1); String queueReceiverID = (String)c.get(2); groupState.removeQueueReceiver(queueName, src, sessionID, queueReceiverID); } else { log.error("Unknown server administration command: "+comm); } } void advertiseQueueReceiver(String queueName, String sessionID, String queueReceiverID, boolean isOn) throws ProviderException { try { // multicast the change, this will update my own state as well String cs = isOn ? ServerAdminCommand.ADD_QUEUE_RECEIVER : ServerAdminCommand.REMOVE_QUEUE_RECEIVER; ServerAdminCommand comm = new ServerAdminCommand(cs, queueName, sessionID, queueReceiverID); serverChannel.send(null, null, comm); } catch(ChannelException e) { throw new ProviderException("Failed to advertise the queue receiver", e); } } private void queueForward(QueueCarrier qc) throws Exception { Queue destQueue = (Queue)qc.getJMSMessage().getJMSDestination(); QueueReceiverAddress ra = groupState.selectReceiver(destQueue.getQueueName()); if (ra == null) { // TO_DO: no receivers for this queue, discard it for the time being log.warn("Discarding message for queue "+destQueue.getQueueName()+"!"); return; } Address destAddress = ra.getAddress(); qc.setSessionID(ra.getSessionID()); qc.setReceiverID(ra.getReceiverID()); // forward it to the final destination serverChannel.send(destAddress, null, qc); } // // Connection INTERFACE IMPLEMENTATION // public void start() throws JMSException { // makes sense to call it only a connection that is stopped. If called on a started // connection, the call is ignored. If called on a closed connection: TO_DO // TO_DO: throw apropriate exceptions for illegal transitions if (connState.isStarted()) { return; } synchronized(connState) { connState.setStarted(); connState.notify(); } } public void stop() throws JMSException { // TO_DO: throw apropriate exceptions for illegal transitions connState.setStopped(); } public void close() throws JMSException { // TO_DO: throw apropriate exceptions for illegal transitions // TO_DO: read the rest of specs and make sure I comply; tests if (connState.isClosed()) { return; } connState.setClosed(); serverChannel.close(); } public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { return sessionManager.createSession(transacted, acknowledgeMode); } public String getClientID() throws JMSException { throw new NotImplementedException(); } public void setClientID(String clientID) throws JMSException { // Once the connection has been initialized, the runtime provides a ClientID, that cannot // be changed by the user; according to JMS1.1 specs, the method should throw // IllegalStateException String msg = "ClientID ("+""+") cannot be modified"; throw new IllegalStateException(msg); } public ConnectionMetaData getMetaData() throws JMSException { throw new NotImplementedException(); } public ExceptionListener getExceptionListener() throws JMSException { throw new NotImplementedException(); } public void setExceptionListener(ExceptionListener listener) throws JMSException { throw new NotImplementedException(); } public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { throw new NotImplementedException(); } public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { throw new NotImplementedException(); } // // END OF Connection INTERFACE IMPLEMENTATION // /** * Debugging only **/ public static void main(String[] args) throws Exception { GroupConnection c = new GroupConnection(new URL(args[0])); c.connect(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/GroupConnectionFactory.java Index: GroupConnectionFactory.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.ConnectionFactory; import javax.jms.Connection; import javax.jms.JMSException; import java.net.URL; /** * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class GroupConnectionFactory implements ConnectionFactory { private static final Logger log = Logger.getLogger(GroupConnectionFactory.class); private String stackConfigFileName; public GroupConnectionFactory(String stackConfigFileName) { this.stackConfigFileName = stackConfigFileName; } /** * The Connection is stopped, but it is active (ready to send and receive traffic), * which means the method throws an exception if the group cannot be contacted for some * reason. * * @see javax.jms.ConnectionFactory#createConnection() * **/ public Connection createConnection() throws JMSException { URL url = getClass().getClassLoader().getResource(stackConfigFileName); if (url == null) { String msg = "The channel configuration file (" + stackConfigFileName + ") not found! "+ "Make sure it is in classpath."; throw new JMSException(msg); } GroupConnection c = new GroupConnection(url); c.connect(); return c; } public String toString() { return getClass().getName().toString()+"@"+Integer.toHexString(hashCode())+"["+ stackConfigFileName+"]"; } /** * The Connection is stopped, but it is active (ready to send and receive traffic), * which means the method throws an exception if the group cannot be contacted for some * reason. * * @see javax.jms.ConnectionFactory#createConnection(String, String) **/ public Connection createConnection(String userName, String password) throws JMSException { throw new NotImplementedException(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/GroupQueue.java Index: GroupQueue.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.Queue; import javax.jms.JMSException; /** * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class GroupQueue implements Queue { private static final Logger log = Logger.getLogger(GroupQueue.class); private String name; public GroupQueue(String name) { this.name = name; } public String getQueueName() throws JMSException { return name; } public String toString() { try { return Destinations.stringRepresentation(this); } catch(JMSException e) { return "Invalid GroupQueue"; } } public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof GroupQueue)) { return false; } GroupQueue that = (GroupQueue)o; if (name == null) { return false; } return name.equals(that.name); } public int hashCode() { // TO_DO: review this if (name == null) { return 0; } return name.hashCode(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/GroupState.java Index: GroupState.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import org.jgroups.Address; import java.util.Map; import java.util.HashMap; import java.util.List; import java.util.ArrayList; import org.jgroups.util.Util; import java.util.Iterator; /** * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ class GroupState { private static final Logger log = Logger.getLogger(GroupState.class); private Map queues; public synchronized byte[] toByteBuffer() throws Exception { return Util.objectToByteBuffer(queues); } public synchronized void fromByteBuffer(byte[] ba) throws Exception { Object o = Util.objectFromByteBuffer(ba); if (o == null) { queues = null; } else if (o instanceof Map) { queues = (Map)o; } else { throw new IllegalStateException("Invalid group state"); } } public synchronized void addQueueReceiver(String queueName, Address addr, String sessionID, String queueReceiverID) { if (queues == null) { queues = new HashMap(); } List l = (List)queues.get(queueName); if (l == null) { l = new ArrayList(); queues.put(queueName, l); } QueueReceiverAddress ra = new QueueReceiverAddress(addr, sessionID, queueReceiverID); if (l.contains(ra)) { log.warn(ra+" already in the group state"); return; } l.add(ra); log.debug("New GroupState: "+toString()); } /** * If no such queue receiver is found, the method logs the event as a warning. **/ public synchronized void removeQueueReceiver(String queueName, Address addr, String sessionID, String queueReceiverID) { String noSuchReceiverMsg = "No such queue receiver: "+queueName+"/"+addr+"/"+sessionID+"/"+queueReceiverID; List l = null; if (queues == null || ((l = (List)queues.get(queueName)) == null) || l.isEmpty()) { log.warn(noSuchReceiverMsg); } if (!l.remove(new QueueReceiverAddress(addr, sessionID, queueReceiverID))) { log.warn(noSuchReceiverMsg); } log.debug("New GroupState: "+toString()); } /** * Could return null if there is no receiver for the queue **/ public synchronized QueueReceiverAddress selectReceiver(String queueName) { if (queues == null) { return null; } List l = (List)queues.get(queueName); if (l == null || l.size() == 0) { return null; } QueueReceiverAddress selected = null; int crtidx = 0; for(Iterator i = l.iterator(); i.hasNext(); crtidx++) { QueueReceiverAddress crt = (QueueReceiverAddress)i.next(); if (crt.isNextForDelivery()) { selected = crt; crt.setNextForDelivery(false); ((QueueReceiverAddress)l.get((crtidx + 1) % l.size())).setNextForDelivery(true); break; } } if (selected == null) { selected = (QueueReceiverAddress)l.get(0); ((QueueReceiverAddress)l.get(1 % l.size())).setNextForDelivery(true);; } return selected; } public String toString() { return queues == null ? "null" : queues.toString(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/GroupTopic.java Index: GroupTopic.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.Topic; import javax.jms.JMSException; /** * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class GroupTopic implements Topic { private static final Logger log = Logger.getLogger(GroupTopic.class); private String name; public GroupTopic(String name) { this.name = name; } public String getTopicName() throws JMSException { return name; } public String toString() { try { return Destinations.stringRepresentation(this); } catch(JMSException e) { return "Invalid GroupTopic"; } } public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof GroupTopic)) { return false; } GroupTopic that = (GroupTopic)o; if (name == null) { return false; } return name.equals(that.name); } public int hashCode() { // TO_DO: review this if (name == null) { return 0; } return name.hashCode(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/MessageConsumerImpl.java Index: MessageConsumerImpl.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.JMSException; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageConsumer; /** * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ abstract class MessageConsumerImpl implements MessageConsumer { private static final Logger log = Logger.getLogger(MessageConsumerImpl.class); protected SessionImpl session; private MessageListener listener; private Destination destination; MessageConsumerImpl(SessionImpl session, Destination destination) { this.session = session; this.destination = destination; } Destination getDestination() { return destination; } // // MessageConsumer INTERFACE IMPLEMENTATION // public String getMessageSelector() throws JMSException { throw new NotImplementedException(); } public MessageListener getMessageListener() throws JMSException { return listener; } public void setMessageListener(MessageListener listener) throws JMSException { this.listener = listener; } public Message receive() throws JMSException { throw new NotImplementedException(); } public Message receive(long timeout) throws JMSException { throw new NotImplementedException(); } public Message receiveNoWait() throws JMSException { throw new NotImplementedException(); } public abstract void close() throws JMSException; } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/MessageImpl.java Index: MessageImpl.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.Message; import javax.jms.JMSException; import javax.jms.Destination; import java.util.Enumeration; import java.io.Serializable; /** * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ class MessageImpl implements Message, Serializable { private static final Logger log = Logger.getLogger(MessageImpl.class); static final long serialVersionUID = 29880310721131848L; private String destination; public Destination getJMSDestination() throws JMSException { return Destinations.createDestination(destination); } public void setJMSDestination(Destination dest) throws JMSException { destination = Destinations.stringRepresentation(dest); } public String getJMSMessageID() throws JMSException { throw new NotImplementedException(); } public void setJMSMessageID(String id) throws JMSException { throw new NotImplementedException(); } public long getJMSTimestamp() throws JMSException { throw new NotImplementedException(); } public void setJMSTimestamp(long timestamp) throws JMSException { throw new NotImplementedException(); } public byte [] getJMSCorrelationIDAsBytes() throws JMSException { throw new NotImplementedException(); } public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException { throw new NotImplementedException(); } public void setJMSCorrelationID(String correlationID) throws JMSException { throw new NotImplementedException(); } public String getJMSCorrelationID() throws JMSException { throw new NotImplementedException(); } public Destination getJMSReplyTo() throws JMSException { throw new NotImplementedException(); } public void setJMSReplyTo(Destination replyTo) throws JMSException { throw new NotImplementedException(); } public int getJMSDeliveryMode() throws JMSException { throw new NotImplementedException(); } public void setJMSDeliveryMode(int deliveryMode) throws JMSException { throw new NotImplementedException(); } public boolean getJMSRedelivered() throws JMSException { throw new NotImplementedException(); } public void setJMSRedelivered(boolean redelivered) throws JMSException { throw new NotImplementedException(); } public String getJMSType() throws JMSException { throw new NotImplementedException(); } public void setJMSType(String type) throws JMSException { throw new NotImplementedException(); } public long getJMSExpiration() throws JMSException { throw new NotImplementedException(); } public void setJMSExpiration(long expiration) throws JMSException { throw new NotImplementedException(); } public int getJMSPriority() throws JMSException { throw new NotImplementedException(); } public void setJMSPriority(int priority) throws JMSException { throw new NotImplementedException(); } public void clearProperties() throws JMSException { throw new NotImplementedException(); } public boolean propertyExists(String name) throws JMSException { throw new NotImplementedException(); } public boolean getBooleanProperty(String name) throws JMSException { throw new NotImplementedException(); } public byte getByteProperty(String name) throws JMSException { throw new NotImplementedException(); } public short getShortProperty(String name) throws JMSException { throw new NotImplementedException(); } public int getIntProperty(String name) throws JMSException { throw new NotImplementedException(); } public long getLongProperty(String name) throws JMSException { throw new NotImplementedException(); } public float getFloatProperty(String name) throws JMSException { throw new NotImplementedException(); } public double getDoubleProperty(String name) throws JMSException { throw new NotImplementedException(); } public String getStringProperty(String name) throws JMSException { throw new NotImplementedException(); } public Object getObjectProperty(String name) throws JMSException { throw new NotImplementedException(); } public Enumeration getPropertyNames() throws JMSException { throw new NotImplementedException(); } public void setBooleanProperty(String name, boolean value) throws JMSException { throw new NotImplementedException(); } public void setByteProperty(String name, byte value) throws JMSException { throw new NotImplementedException(); } public void setShortProperty(String name, short value) throws JMSException { throw new NotImplementedException(); } public void setIntProperty(String name, int value) throws JMSException { throw new NotImplementedException(); } public void setLongProperty(String name, long value) throws JMSException { throw new NotImplementedException(); } public void setFloatProperty(String name, float value) throws JMSException { throw new NotImplementedException(); } public void setDoubleProperty(String name, double value) throws JMSException { throw new NotImplementedException(); } public void setStringProperty(String name, String value) throws JMSException { throw new NotImplementedException(); } public void setObjectProperty(String name, Object value) throws JMSException { throw new NotImplementedException(); } public void acknowledge() throws JMSException { throw new NotImplementedException(); } public void clearBody() throws JMSException { throw new NotImplementedException(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/MessageProducerImpl.java Index: MessageProducerImpl.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.JMSException; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageProducer; /** * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ class MessageProducerImpl implements MessageProducer { private static final Logger log = Logger.getLogger(MessageProducerImpl.class); private SessionImpl session; private Destination destination; MessageProducerImpl(SessionImpl session, Destination destination) { this.session = session; this.destination = destination; // TO_DO } // // MessageProducer INTERFACE IMPLEMENTATION // public void setDisableMessageID(boolean value) throws JMSException { throw new NotImplementedException(); } public boolean getDisableMessageID() throws JMSException { throw new NotImplementedException(); } public void setDisableMessageTimestamp(boolean value) throws JMSException { throw new NotImplementedException(); } public boolean getDisableMessageTimestamp() throws JMSException { throw new NotImplementedException(); } public void setDeliveryMode(int deliveryMode) throws JMSException { throw new NotImplementedException(); } public int getDeliveryMode() throws JMSException { throw new NotImplementedException(); } public void setPriority(int defaultPriority) throws JMSException { throw new NotImplementedException(); } public int getPriority() throws JMSException { throw new NotImplementedException(); } public void setTimeToLive(long timeToLive) throws JMSException { throw new NotImplementedException(); } public long getTimeToLive() throws JMSException { throw new NotImplementedException(); } public Destination getDestination() throws JMSException { return destination; } public void close() throws JMSException { throw new NotImplementedException(); } public void send(Message message) throws JMSException { message.setJMSDestination(destination); session.send(message); } public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { throw new NotImplementedException(); } public void send(Destination destination, Message message) throws JMSException { throw new NotImplementedException(); } public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { throw new NotImplementedException(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/NotImplementedException.java Index: NotImplementedException.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; /** * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class NotImplementedException extends RuntimeException { public NotImplementedException() { super(); } public NotImplementedException(String message) { super(message); } public NotImplementedException(String message, Throwable cause) { super(message, cause); } public NotImplementedException(Throwable cause) { super(cause); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/ProviderException.java Index: ProviderException.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; /** * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class ProviderException extends Exception { public ProviderException() { super(); } public ProviderException(String message) { super(message); } public ProviderException(String message, Throwable cause) { super(message, cause); } public ProviderException(Throwable cause) { super(cause); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/QueueCarrier.java Index: QueueCarrier.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import java.io.Serializable; /** * JMS Message wrapper. It carries additional information that helps the JMS provider to * route the messages to its final destination. * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class QueueCarrier implements Serializable { static final long serialVersionUID = 214803107211354L; private String sessionID; private String queueReceiverID; private javax.jms.Message jmsMessage; public QueueCarrier(javax.jms.Message jmsMessage) { this(null, null, jmsMessage); } public QueueCarrier(String sessionID, String queueReceiverID, javax.jms.Message jmsMessage) { this.sessionID = sessionID; this.queueReceiverID = queueReceiverID; this.jmsMessage = jmsMessage; } public String getSessionID() { return sessionID; } public void setSessionID(String id) { sessionID = id; } public String getReceiverID() { return queueReceiverID; } public void setReceiverID(String id) { queueReceiverID = id; } public javax.jms.Message getJMSMessage() { return jmsMessage; } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/QueueReceiverAddress.java Index: QueueReceiverAddress.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import org.jgroups.Address; import java.io.Serializable; /** * A wrapper around information that uniquely identifies a QueueReceiver in a group. * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ public class QueueReceiverAddress implements Serializable { static final long serialVersionUID = 11480310721131223L; private static final Logger log = Logger.getLogger(QueueReceiverAddress.class); private Address addr; private String sessionID; private String queueReceiverID; private boolean nextForDelivery; public QueueReceiverAddress(Address addr, String sessionID, String queueReceiverID) { if (addr == null) { throw new NullPointerException("null address"); } if (sessionID == null) { throw new NullPointerException("null session ID"); } if (queueReceiverID == null) { throw new NullPointerException("null queue receiver ID"); } this.addr = addr; this.sessionID = sessionID; this.queueReceiverID = queueReceiverID; } public Address getAddress() { return addr; } public String getSessionID() { return sessionID; } public String getReceiverID() { return queueReceiverID; } public boolean isNextForDelivery() { return nextForDelivery; } public void setNextForDelivery(boolean b) { nextForDelivery = b; } public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof QueueReceiverAddress)) { return false; } QueueReceiverAddress that = (QueueReceiverAddress)o; return (addr != null && addr.equals(that.addr)) && (sessionID != null && sessionID.equals(that.sessionID)) && (queueReceiverID != null && queueReceiverID.equals(that.queueReceiverID)); } public int hashCode() { // TO_DO return (addr == null ? 0 : addr.hashCode()) + (sessionID == null ? 0 : sessionID.hashCode()) + (queueReceiverID == null ? 0 : queueReceiverID.hashCode()); } public String toString() { StringBuffer sb = new StringBuffer("QueueReceiverAddress["); sb.append(addr); sb.append("/sessionID="); sb.append(sessionID); sb.append("/receiverID="); sb.append(queueReceiverID); sb.append("]"); return sb.toString(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/QueueReceiverImpl.java Index: QueueReceiverImpl.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.Queue; import javax.jms.QueueReceiver; import javax.jms.JMSException; import javax.jms.Destination; /** * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver { private static final Logger log = Logger.getLogger(TopicSubscriberImpl.class); private String id; /** * @param id - the receiver id. The Session instance that owns this receiver instance * guarantees id uniqueness during its lifetime. **/ QueueReceiverImpl(SessionImpl session, String id, Queue queue) { super(session, queue); this.id = id; } public String getID() { return id; } // // MessageConsumer INTERFACE METHODS // public void close() throws JMSException { setMessageListener(null); session.removeConsumer(this); } // // QueueReceiver INTERFACE IMPLEMENTATION // public Queue getQueue() throws JMSException { return (Queue)getDestination(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/QueueSenderImpl.java Index: QueueSenderImpl.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import javax.jms.JMSException; import javax.jms.Destination; import javax.jms.Message; import javax.jms.QueueSender; import javax.jms.Queue; /** * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ class QueueSenderImpl extends MessageProducerImpl implements QueueSender { private static final Logger log = Logger.getLogger(QueueSenderImpl.class); QueueSenderImpl(SessionImpl session, Queue queue) { super(session, queue); // TO_DO } // // QueueSender INTERFACE IMPLEMENTATION // public Queue getQueue() throws JMSException { throw new NotImplementedException(); } // public void send(Message message) throws JMSException { // throw new NotImplementedException(); // } // public void send(Message message, int deliveryMode, int priority, long timeToLive) // throws JMSException { // throw new NotImplementedException(); // } public void send(Queue queue, Message message) throws JMSException { throw new NotImplementedException(); } public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { throw new NotImplementedException(); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/ServerAdminCommand.java Index: ServerAdminCommand.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import java.io.Serializable; import java.util.List; import java.util.ArrayList; /** * TO_DO: change the name to GroupStateChange ? * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ class ServerAdminCommand implements Serializable { static final long serialVersionUID = 33480310721131848L; public static final String ADD_QUEUE_RECEIVER = "ADD_QUEUE_RECEIVER"; public static final String REMOVE_QUEUE_RECEIVER = "REMOVE_QUEUE_RECEIVER"; private String comm; private List args; public ServerAdminCommand(String comm, List args) { this.comm = comm; this.args = args; } public ServerAdminCommand(String comm, String arg1, String arg2, String arg3) { this(comm, new ArrayList()); args.add(arg1); args.add(arg2); args.add(arg3); } public String getCommand() { return comm; } public Object get(int i) { return args.get(i); } } 1.1 jboss-jms/src/main/org/jboss/jms/serverless/SessionImpl.java Index: SessionImpl.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.jms.serverless; import org.jboss.logging.Logger; import java.io.Serializable; import javax.jms.Session; import javax.jms.BytesMessage; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.JMSException; import javax.jms.TextMessage; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.MessageConsumer; import javax.jms.Destination; import javax.jms.Queue; import javax.jms.Topic; import javax.jms.TopicSubscriber; import javax.jms.QueueBrowser; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import java.util.List; import java.util.ArrayList; import java.util.Iterator; /** * * @author Ovidiu Feodorov <ov...@jb...> * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $ * **/ class SessionImpl implements Session { private static final Logger log = Logger.getLogger(SessionImpl.class); private SessionManager sessionManager; private String id; private List subscribers; private List receivers; private boolean transacted; private int acknowledgeMode; private int receiverCounter = 0; /** * @param id - the session id. The SessionManager instance guarantees uniqueness during its * lifetime. **/ SessionImpl(SessionManager sessionManager, String id, boolean transacted, int acknowledgeMode) { this.sessionManager = sessionManager; this.id = id; subscribers = new ArrayList(); receivers = new ArrayList(); this.transacted = transacted; this.acknowledgeMode = acknowledgeMode; if (transacted) { throw new NotImplementedException("Transacted sessions not supported"); } } public String getID() { return id; } void send(Message m) throws JMSException { sessionManager.getConnection().send(m); } /** * Delivery to topic subscribers. **/ // TO_DO: acknowledgement, deal with failed deliveries void deliver(Message m) { // TO_DO: single threaded access for sessions // So far, the only thread that accesses dispatch() is the connection's puller thread and // this will be the unique thread that accesses the Sessions. This may not be sufficient // for high load, consider the possiblity to (dynamically) add new threads to handle // delivery, possibly a thread per session. Destination destination = null; try { destination = m.getJMSDestination(); } catch(JMSException e) { // TO_DO: cannot deliver, a failure handler should take over log.error("Unhandled failure", e); return; } // TO_DO: properly handle the case when the destination is null for(Iterator i = subscribers.iterator(); i.hasNext(); ) { ... [truncated message content] |