Update of /cvsroot/openjms/openjms/src/main/org/exolab/jms/server/mipc In directory sc8-pr-cvs1:/tmp/cvs-serv5826 Modified Files: IpcJmsServer.java IpcJmsSessionConnection.java IpcJmsSessionList.java IpcJmsSessionSender.java Log Message: Fixes for asynch message handling. Previously, if more than one javax.jms.Connection was opened on a multiplex connection, the subsequent closure of one of them (or its sessions) would cause the 'message' channel to close as well. Index: IpcJmsServer.java =================================================================== RCS file: /cvsroot/openjms/openjms/src/main/org/exolab/jms/server/mipc/IpcJmsServer.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** IpcJmsServer.java 13 Jan 2003 13:44:08 -0000 1.12 --- IpcJmsServer.java 22 Jan 2003 23:39:54 -0000 1.13 *************** *** 173,180 **** throws IOException { IpcJmsSessionList list = ! (IpcJmsSessionList) _consumerList.get(session.getClientId()); if (list == null) { list = new IpcJmsSessionList(connection); ! _consumerList.put(session.getClientId(), list); } list.add(session); --- 173,180 ---- throws IOException { IpcJmsSessionList list = ! (IpcJmsSessionList) _consumerList.get(connection); if (list == null) { list = new IpcJmsSessionList(connection); ! _consumerList.put(connection, list); } list.add(session); *************** *** 185,195 **** * * @param session the session */ ! synchronized public void removeConnection(JmsServerSession session) { ! IpcJmsSessionList list = (IpcJmsSessionList)_consumerList.get ! (session.getClientId()); if (list != null) { if (list.remove(session)) { ! _consumerList.remove(session.getClientId()); } } --- 185,197 ---- * * @param session the session + * @param connection the connection to the client */ ! public synchronized void removeConnection( ! JmsServerSession session, MultiplexConnectionIfc connection) { ! IpcJmsSessionList list = (IpcJmsSessionList) _consumerList.get( ! connection); if (list != null) { if (list.remove(session)) { ! _consumerList.remove(connection); } } Index: IpcJmsSessionConnection.java =================================================================== RCS file: /cvsroot/openjms/openjms/src/main/org/exolab/jms/server/mipc/IpcJmsSessionConnection.java,v retrieving revision 1.18 retrieving revision 1.19 diff -C2 -d -r1.18 -r1.19 *** IpcJmsSessionConnection.java 18 Jan 2003 01:58:40 -0000 1.18 --- IpcJmsSessionConnection.java 22 Jan 2003 23:39:55 -0000 1.19 *************** *** 39,43 **** * OF THE POSSIBILITY OF SUCH DAMAGE. * ! * Copyright 2000 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id$ --- 39,43 ---- * OF THE POSSIBILITY OF SUCH DAMAGE. * ! * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id$ *************** *** 117,121 **** Serializable result = null; if (func.equals("close")) { ! result = close(getSession(id, v)); } else if (func.equals("acknowledgeMessage")) { result = acknowledgeMessage(getSession(id, v), (Long)v.get(5), --- 117,121 ---- Serializable result = null; if (func.equals("close")) { ! result = close(getSession(id, v), getConnection(id)); } else if (func.equals("acknowledgeMessage")) { result = acknowledgeMessage(getSession(id, v), (Long)v.get(5), *************** *** 264,271 **** * */ ! protected Vector close(JmsServerSession session) { if (session != null) { try { ! IpcJmsServer.instance().removeConnection(session); session.close(); } catch (JMSException err) { --- 264,272 ---- * */ ! protected Vector close(JmsServerSession session, ! MultiplexConnectionIfc connection) { if (session != null) { try { ! IpcJmsServer.instance().removeConnection(session, connection); session.close(); } catch (JMSException err) { Index: IpcJmsSessionList.java =================================================================== RCS file: /cvsroot/openjms/openjms/src/main/org/exolab/jms/server/mipc/IpcJmsSessionList.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IpcJmsSessionList.java 20 Jul 2001 07:53:56 -0000 1.4 --- IpcJmsSessionList.java 22 Jan 2003 23:39:55 -0000 1.5 *************** *** 39,43 **** * OF THE POSSIBILITY OF SUCH DAMAGE. * ! * Copyright 2000 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id$ --- 39,43 ---- * OF THE POSSIBILITY OF SUCH DAMAGE. * ! * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id$ *************** *** 50,59 **** package org.exolab.jms.server.mipc; - import java.util.Enumeration; - import java.util.HashMap; import java.io.IOException; ! import org.exolab.jms.server.JmsServerSession; import org.exolab.core.mipc.MultiplexConnectionIfc; import org.exolab.core.mipc.ObjectChannel; --- 50,60 ---- package org.exolab.jms.server.mipc; import java.io.IOException; ! import java.util.HashMap; ! import java.util.Iterator; ! import org.exolab.core.mipc.MultiplexConnectionIfc; import org.exolab.core.mipc.ObjectChannel; + import org.exolab.jms.server.JmsServerSession; *************** *** 68,181 **** * @author <a href="mailto:mou...@ex...">Jim Mourikis</a> * @see org.exolab.jms.server.mipc.IpcJmsServer ! * @see org.exolab.jms.server.mipc.IpcJmsSessionSender ! * ! **/ ! ! public class IpcJmsSessionList ! { ! // The client connection. ! private ObjectChannel client_; ! // The list of session senders ! private HashMap list_; ! /** ! * Create a connection to the client listener for sending JmsMessages. ! * ! * @param connection The MultiplexConnection between client and server ! * @exception IOException If a connection error results. ! * ! */ ! public IpcJmsSessionList(MultiplexConnectionIfc connection) ! throws IOException ! { ! list_ = new HashMap(10); ! client_ = new ObjectChannel( "message", connection ); ! } ! ! ! /** ! * Add a new session sender for the client. If its not already active. ! * ! * @param session The session to add a notifier on ! * ! */ ! public void add(JmsServerSession session) ! { ! if (!list_.containsKey(session.getSessionId())) ! { ! list_.put(session.getSessionId(), ! new IpcJmsSessionSender(client_, session)); ! } ! } ! ! /** ! * Remove a sesson notifier. If this is the last session for this client ! * close the client connection. ! * ! * @param session The session to remove the notifier from ! * @return boolean True there are no more connections for this client ! * ! */ ! public boolean remove(JmsServerSession session) ! { ! IpcJmsSessionSender sender = ! (IpcJmsSessionSender)list_.remove(session.getSessionId()); ! boolean result = false; ! if (sender != null) ! { ! sender.close(); ! } ! if (list_.size() == 0) ! { ! closeConnection(); ! list_= null; ! result = true; ! } ! return result; ! } ! ! /** ! * Remove all session notfiers and close the client connection. ! * ! */ ! public void removeAll() ! { ! Object[] e = list_.values().toArray(); ! ! for (int i = 0, j = list_.size(); i < j; i++) ! { ! ((IpcJmsSessionSender)e[i]).close(); ! } ! closeConnection(); ! list_.clear(); ! list_= null; ! } ! // Close the connection to the client ! private void closeConnection() ! { ! if (client_ != null) ! { ! try ! { ! client_.close(); ! } ! catch(IOException err) ! { ! System.err.println("mipc.IpcJmsSessionList: Failed to close client connection\n" ! + err); ! } ! client_ = null; ! } ! } ! ! ! } // End IpcJmsSessionList --- 69,161 ---- * @author <a href="mailto:mou...@ex...">Jim Mourikis</a> * @see org.exolab.jms.server.mipc.IpcJmsServer ! * @see org.exolab.jms.server.mipc.IpcJmsSessionSender ! */ ! public class IpcJmsSessionList { ! /** ! * The client connection. ! */ ! private ObjectChannel _client; ! /** ! * The list of session senders ! */ ! private HashMap _list; + + /** + * Create a connection to the client listener for sending JmsMessages. + * + * @param connection The MultiplexConnection between client and server + * @exception IOException If a connection error results. + * + */ + public IpcJmsSessionList(MultiplexConnectionIfc connection) + throws IOException { + _list = new HashMap(10); + _client = new ObjectChannel("message", connection); + } ! /** ! * Add a new session sender for the client. If its not already active. ! * ! * @param session The session to add a notifier on ! */ ! public synchronized void add(JmsServerSession session) { ! if (!_list.containsKey(session.getSessionId())) { ! _list.put(session.getSessionId(), ! new IpcJmsSessionSender(_client, session)); ! } ! } ! /** ! * Remove a sesson notifier. If this is the last session for this client ! * close the client connection. ! * ! * @param session The session to remove the notifier from ! * @return boolean True there are no more connections for this client ! * ! */ ! public synchronized boolean remove(JmsServerSession session) { ! IpcJmsSessionSender sender = ! (IpcJmsSessionSender)_list.remove(session.getSessionId()); ! boolean result = false; ! if (sender != null) { ! sender.close(); ! } ! if (_list.isEmpty()) { ! closeConnection(); ! _list= null; ! result = true; ! } ! return result; ! } ! /** ! * Remove all session notfiers and close the client connection. ! */ ! public synchronized void removeAll() { ! Iterator iterator = _list.values().iterator(); ! while (iterator.hasNext()) { ! IpcJmsSessionSender sender = (IpcJmsSessionSender) iterator.next(); ! sender.close(); ! } ! _list.clear(); ! closeConnection(); ! _list= null; ! } + // Close the connection to the client + private void closeConnection() { + if (_client != null) { + try { + _client.close(); + } catch(IOException ignore) { + } + _client = null; + } + } ! } //-- IpcJmsSessionList Index: IpcJmsSessionSender.java =================================================================== RCS file: /cvsroot/openjms/openjms/src/main/org/exolab/jms/server/mipc/IpcJmsSessionSender.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** IpcJmsSessionSender.java 18 Jan 2003 02:04:29 -0000 1.7 --- IpcJmsSessionSender.java 22 Jan 2003 23:39:55 -0000 1.8 *************** *** 39,43 **** * OF THE POSSIBILITY OF SUCH DAMAGE. * ! * Copyright 2000 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id$ --- 39,43 ---- * OF THE POSSIBILITY OF SUCH DAMAGE. * ! * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id$ *************** *** 53,64 **** import javax.jms.Message; import java.io.IOException; - import org.exolab.jms.server.JmsServerSession; import org.exolab.core.mipc.ObjectChannel; - import org.exolab.jms.message.MessageImpl; import org.exolab.jms.client.JmsMessageListener; /** ! * This class conatins the ipc connection to a receiver or subscriber * for passing the JmsMessages. * --- 53,65 ---- import javax.jms.Message; import java.io.IOException; import org.exolab.core.mipc.ObjectChannel; import org.exolab.jms.client.JmsMessageListener; + import org.exolab.jms.message.MessageImpl; + import org.exolab.jms.server.ClientDisconnectionException; + import org.exolab.jms.server.JmsServerSession; /** ! * This class contains the ipc connection to a receiver or subscriber * for passing the JmsMessages. * *************** *** 68,77 **** * @author <a href="mailto:mou...@ex...">Jim Mourikis</a> * @see org.exolab.jms.server.mipc.IpcJmsSessionConnection ! * @see org.exolab.jms.server.JmsServerSession ! * @see org.exolab.core.ipc.Client * ! **/ ! ! public class IpcJmsSessionSender implements JmsMessageListener { // The client connection. --- 69,76 ---- * @author <a href="mailto:mou...@ex...">Jim Mourikis</a> * @see org.exolab.jms.server.mipc.IpcJmsSessionConnection ! * @see org.exolab.jms.server.JmsServerSession ! * @see org.exolab.core.ipc.Client * ! */ public class IpcJmsSessionSender implements JmsMessageListener { // The client connection. *************** *** 81,85 **** private JmsServerSession session_ = null; - /** * Create a connection to the client listener for sending JmsMessages. --- 80,83 ---- *************** *** 89,93 **** * */ ! public IpcJmsSessionSender(ObjectChannel client, JmsServerSession session) { client_ = client; session_ = session; --- 87,92 ---- * */ ! public IpcJmsSessionSender(ObjectChannel client, ! JmsServerSession session) { client_ = client; session_ = session; *************** *** 95,106 **** } - /** * Send a JmsMessage to a listener. * * @param message The message to send. - * */ ! public void onMessage(Message message) { try { if (client_ != null) { --- 94,103 ---- } /** * Send a JmsMessage to a listener. * * @param message The message to send. */ ! public synchronized void onMessage(Message message) { try { if (client_ != null) { *************** *** 115,126 **** // calls. I could synchronize the inside of the channel // but decided to do it right here for now. ! synchronized(client_) { ! client_.send(v); ! Vector reply = (Vector) client_.receive(); ! } } } catch (Exception err) { ! throw new org.exolab.jms.server.ClientDisconnectionException ! (err.getMessage()); } } --- 112,120 ---- // calls. I could synchronize the inside of the channel // but decided to do it right here for now. ! client_.send(v); ! Vector reply = (Vector) client_.receive(); } } catch (Exception err) { ! throw new ClientDisconnectionException(err.getMessage()); } } *************** *** 132,136 **** * @param messages - collection of MessageImpl objects */ ! public void onMessages(Vector messages) { try { if (client_ != null) { --- 126,130 ---- * @param messages - collection of MessageImpl objects */ ! public synchronized void onMessages(Vector messages) { try { if (client_ != null) { *************** *** 144,156 **** // calls. I could synchronize the inside of the channel // but decided to do it right here for now. ! synchronized(client_) { ! client_.send(v); ! Vector reply = (Vector) client_.receive(); ! } } } catch (Exception err) { ! // System.err.println("Failure sending message" + err); ! throw new org.exolab.jms.server.ClientDisconnectionException ! (err.getMessage()); } } --- 138,146 ---- // calls. I could synchronize the inside of the channel // but decided to do it right here for now. ! client_.send(v); ! Vector reply = (Vector) client_.receive(); } } catch (Exception err) { ! throw new ClientDisconnectionException(err.getMessage()); } } *************** *** 161,165 **** * @param clinet - the client identity */ ! public void onMessageAvailable(long clientId) { try { if (client_ != null) { --- 151,155 ---- * @param clinet - the client identity */ ! public synchronized void onMessageAvailable(long clientId) { try { if (client_ != null) { *************** *** 173,182 **** // calls. I could synchronize the inside of the channel // but decided to do it right here for now. ! synchronized(client_) { ! client_.send(v); ! } } ! } catch (IOException err) { ! throw new org.exolab.jms.server.ClientDisconnectionException(err.getMessage()); } } --- 163,171 ---- // calls. I could synchronize the inside of the channel // but decided to do it right here for now. ! client_.send(v); ! Vector reply = (Vector) client_.receive(); } ! } catch (Exception err) { ! throw new ClientDisconnectionException(err.getMessage()); } } *************** *** 184,198 **** /** * Remove the callback from the JmsServerSession. - * */ ! public void close() { if (session_ != null) { session_.setMessageListener(null); session_ = null; } - client_ = null; } ! ! } // End IpcJmsSessionSender --- 173,184 ---- /** * Remove the callback from the JmsServerSession. */ ! public synchronized void close() { if (session_ != null) { session_.setMessageListener(null); session_ = null; } client_ = null; } ! } //-- IpcJmsSessionSender |