[Ubermq-commits] jms/src/com/ubermq/jms/client/impl LocalTopicSubscriber.java,1.7,1.8
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-08-30 16:00:16
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl In directory usw-pr-cvs1:/tmp/cvs-serv23826/src/com/ubermq/jms/client/impl Modified Files: LocalTopicSubscriber.java Log Message: durable ordering bug fix Index: LocalTopicSubscriber.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/LocalTopicSubscriber.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** LocalTopicSubscriber.java 30 Aug 2002 15:32:50 -0000 1.7 --- LocalTopicSubscriber.java 30 Aug 2002 16:00:12 -0000 1.8 *************** *** 60,65 **** /** ! * JMS properties ! */ private Topic t; private boolean noLocal; --- 60,65 ---- /** ! * JMS properties ! */ private Topic t; private boolean noLocal; *************** *** 71,76 **** /** ! * The session ! */ private TopicSession session; private IClientProcessor proc; --- 71,76 ---- /** ! * The session ! */ private TopicSession session; private IClientProcessor proc; *************** *** 85,191 **** /** ! * Construct a non-durable TopicSubscriber to the specified topic ! * ! * <P>A client uses a TopicSubscriber for receiving messages that have ! * been published to a topic. ! * ! * <P>Regular TopicSubscriber's are not durable. They only receive ! * messages that are published while they are active. ! * ! * <P>In some cases, a connection may both publish and subscribe to a ! * topic. The subscriber NoLocal attribute allows a subscriber to ! * inhibit the delivery of messages published by its own connection. ! * The default value for this attribute is false. ! * ! * @param session the session subscribe through ! * @param topic the topic to subscribe to ! * ! * @exception JMSException if a session fails to create a subscriber ! * due to some JMS error. ! * @exception javax.jms.InvalidDestinationException if invalid Topic specified. ! * ! * Please complete the missing tags for TopicSubscriber ! * @return ! * @throws ! * @pre ! * @post ! */ LocalTopicSubscriber(Topic t, ! String selector, ! boolean noLocal, ! TopicSession ts, ! IClientProcessor proc, ! IDeliveryManager delivery) ! throws JMSException { ! this.t = t; ! this.selector = selector; ! this.noLocal = noLocal; ! this.session = ts; ! this.proc = proc; ! this.delivery = delivery; ! this.durable = false; ! ! // setup receive q with priority ordering. ! init(); ! ! // register myself with the subscription router. ! proc.registerSubscription(t.getTopicName(), ! selector, ! this); } /** ! * This constructor creates (or re-creates) a durable subscription ! * with the given name. ! */ LocalTopicSubscriber(Topic t, ! String selector, ! boolean noLocal, ! String name, ! TopicSession ts, ! IClientProcessor proc, ! IDeliveryManager delivery) ! throws JMSException { ! this.t = t; ! this.selector = selector; ! this.noLocal = noLocal; ! this.session = ts; ! this.proc = proc; ! this.delivery = delivery; ! this.name = name; ! this.durable = true; ! ! // common setup ! init(); ! ! // register myself with the subscription router. ! proc.registerDurableSubscription(t.getTopicName(), ! name, ! selector, ! this); ! } ! ! private void init() { ! ! receiveQueue = new BoundedPriorityQueue(BOUNDED_BUFFER_SIZE, ! new Comparator() { ! ! public int compare(Object o1, Object o2) ! { ! try ! { ! int pri1 = ((Message)o1).getJMSPriority(), ! pri2 = ((Message)o2).getJMSPriority(); ! return pri1 - pri2; ! } ! catch (JMSException e) { ! return 0; ! } ! } ! }); } ! // // Implementation of TopicSubscriber Interface. --- 85,187 ---- /** ! * Construct a non-durable TopicSubscriber to the specified topic ! * ! * <P>A client uses a TopicSubscriber for receiving messages that have ! * been published to a topic. ! * ! * <P>Regular TopicSubscriber's are not durable. They only receive ! * messages that are published while they are active. ! * ! * <P>In some cases, a connection may both publish and subscribe to a ! * topic. The subscriber NoLocal attribute allows a subscriber to ! * inhibit the delivery of messages published by its own connection. ! * The default value for this attribute is false. ! * ! * @param session the session subscribe through ! * @param topic the topic to subscribe to ! * ! * @exception JMSException if a session fails to create a subscriber ! * due to some JMS error. ! * @exception javax.jms.InvalidDestinationException if invalid Topic specified. ! * ! * Please complete the missing tags for TopicSubscriber ! * @return ! * @throws ! * @pre ! * @post ! */ LocalTopicSubscriber(Topic t, ! String selector, ! boolean noLocal, ! TopicSession ts, ! IClientProcessor proc, ! IDeliveryManager delivery) ! throws JMSException { ! this.t = t; ! this.selector = selector; ! this.noLocal = noLocal; ! this.session = ts; ! this.proc = proc; ! this.delivery = delivery; ! this.durable = false; ! ! // setup receive q with priority ordering. ! receiveQueue = new BoundedPriorityQueue(BOUNDED_BUFFER_SIZE, ! new Comparator() { ! ! public int compare(Object o1, Object o2) ! { ! try ! { ! int pri1 = ((Message)o1).getJMSPriority(), ! pri2 = ((Message)o2).getJMSPriority(); ! return pri1 - pri2; ! } ! catch (JMSException e) { ! return 0; ! } ! } ! }); ! ! // register myself with the subscription router. ! proc.registerSubscription(t.getTopicName(), ! selector, ! this); } /** ! * This constructor creates (or re-creates) a durable subscription ! * with the given name. ! */ LocalTopicSubscriber(Topic t, ! String selector, ! boolean noLocal, ! String name, ! TopicSession ts, ! IClientProcessor proc, ! IDeliveryManager delivery) ! throws JMSException { ! this.t = t; ! this.selector = selector; ! this.noLocal = noLocal; ! this.session = ts; ! this.proc = proc; ! this.delivery = delivery; ! this.name = name; ! this.durable = true; ! ! // set up priority queue with no ordering to preserve the ! // assumption that acks go forward in time. ! receiveQueue = new BoundedBuffer(BOUNDED_BUFFER_SIZE); ! ! // register myself with the subscription router. ! proc.registerDurableSubscription(t.getTopicName(), ! name, ! selector, ! this); } ! // // Implementation of TopicSubscriber Interface. *************** *** 196,435 **** /** ! * Get the message consumer's MessageListener. ! * ! * @return the listener for the message consumer, or null if there isn't ! * one set. ! * ! * @exception JMSException if JMS fails to get message ! * listener due to some JMS error ! */ public MessageListener getMessageListener() throws JMSException { ! return messageListener; } /** ! * Set the message consumer's MessageListener. ! * ! * <P>Setting the message listener to null is the equivalent of ! * unsetting the message listener for the message consumer. ! * ! * <P>Calling the setMessageListener method of MessageConsumer ! * while messages are being consumed by an existing listener ! * or the consumer is being used to synchronously consume messages ! * is undefined. ! * ! * @param messageListener the messages are delivered to this listener ! */ public void setMessageListener( MessageListener listener ) ! throws JMSException { ! messageListener = listener; } /** ! * Receive the next message produced for this message consumer. ! * ! * <P>This call blocks indefinitely until a message is produced. ! * ! * <P>If this receive is done within a transaction, the message ! * remains on the consumer until the transaction commits. ! * ! * @exception JMSException if JMS fails to receive the next ! * message due to some error. ! */ public javax.jms.Message receive() ! throws JMSException { ! if(isClosing() || ! session.isClosing()) ! { ! throw new javax.jms.IllegalStateException("Subscriber is closed"); ! } ! ! try { ! javax.jms.Message m = (javax.jms.Message)receiveQueue.take(); ! return m; ! } catch(InterruptedException ie) { ! throw new JMSException(ie.toString()); ! } } /** ! * Receive the next message that arrives within the specified ! * timeout interval. ! * ! * <P>This call blocks until either a message arrives or the ! * timeout expires. ! * ! * @param timeout the timeout value (in milliseconds) ! * ! */ public javax.jms.Message receive( long timeout ) throws JMSException { ! if(isClosing() || ! session.isClosing()) ! { ! throw new javax.jms.IllegalStateException("Subscriber is closed"); ! } ! ! try { ! javax.jms.Message m = (javax.jms.Message)receiveQueue.poll(timeout); ! return m; ! } catch(InterruptedException ie) { ! throw new JMSException(ie.toString()); ! } } /** ! * Receive the next message if one is immediately available. ! * ! * @exception JMSException if JMS fails to receive the next ! * message due to some error. ! * @return the next message produced for this message consumer, or ! * null if one is not available. ! */ public javax.jms.Message receiveNoWait() throws JMSException { ! if(isClosing() || ! session.isClosing() ) ! { ! throw new javax.jms.IllegalStateException("Subscriber is closed"); ! } ! ! javax.jms.Message m = (javax.jms.Message)receiveQueue.peek(); ! return m; } /** ! * Since a provider may allocate some resources on behalf of a ! * MessageConsumer outside the JVM, clients should close them when they ! * are not needed. Relying on garbage collection to eventually reclaim ! * these resources may not be timely enough. ! * ! * <P>This call blocks until a receive or message listener in progress ! * has completed. A blocked message consumer receive call ! * returns null when this message consumer is closed. ! * ! * @exception JMSException if JMS fails to close the consumer ! * due to some error. ! */ public void close() throws JMSException { ! isClosing = true; ! if (durable) ! { ! proc.durableGoingAway(this.name); ! } } boolean isClosing() { ! return isClosing; } /** ! * IDatagramEndpoint method ! */ public void deliver(IDatagram d) { ! // wrap the datagram with a javax.jms.message ! // and queue it in our subscriber buffer ! IMessageDatagram md = (IMessageDatagram)d; ! javax.jms.Message msg = LocalMessage.getMessage(md, this); ! ! // do acknowledgement if we are auto ack mode. ! if (session.ackMode == TopicSession.AUTO_ACKNOWLEDGE) ! internalAcknowledge(md); ! ! // if this is a local delivery, check our ! // special flag and possibly ignore ! if (noLocal && ! session.conn.isSenderLocal(md.getSenderId())) ! { ! return; ! } ! ! // otherwise we forward it using our gap/dup detector. ! delivery.deliver(md.getSenderId(), ! md.getSequence(), ! msg, ! this); } private void internalAcknowledge(IMessageDatagram md) { ! if (durable) { ! session.conn.output(new AckDatagram(md.getMessageId(), false), ! new ExponentialBackoff()); ! } else { ! // we don't use acknolwedgements for non durable subscribers ! // because it is not meaningful ! } } public void acknowledge(IMessageDatagram md) { ! if (session.ackMode == TopicSession.AUTO_ACKNOWLEDGE) { ! // noop because we have already done this for the client ! } else if (session.ackMode == TopicSession.CLIENT_ACKNOWLEDGE) { ! internalAcknowledge(md); ! } else if (session.ackMode == TopicSession.DUPS_OK_ACKNOWLEDGE) { ! // never ack here. ! } } /** ! * This method is called by the managed forwarder to actually ! * enqueue the message for delivery. ! */ public void sendMessage(javax.jms.Message msg) { ! try { ! if (messageListener != null) ! session.asyncDelivery(msg, messageListener); ! else ! receiveQueue.put(msg); ! } catch(Exception ie) { ! com.ubermq.Utility.getLogger().throwing("", "", ie); ! } } synchronized void pause() { ! if (!isPaused) ! { ! isPaused = true; ! ! // accumulate the list of things ! pausedQueue = new ArrayList(BOUNDED_BUFFER_SIZE); ! ! Object o; ! while((o = receiveQueue.peek()) != null) ! { ! pausedQueue.add(o); ! } ! } } synchronized void resume() { ! if (isPaused) ! { ! isPaused = false; ! ! Iterator iter = pausedQueue.iterator(); ! while (iter.hasNext()) ! { ! try ! { ! receiveQueue.offer(iter.next(), 0); ! } ! catch (InterruptedException e) {} ! iter.remove(); ! } ! } } } --- 192,431 ---- /** ! * Get the message consumer's MessageListener. ! * ! * @return the listener for the message consumer, or null if there isn't ! * one set. ! * ! * @exception JMSException if JMS fails to get message ! * listener due to some JMS error ! */ public MessageListener getMessageListener() throws JMSException { ! return messageListener; } /** ! * Set the message consumer's MessageListener. ! * ! * <P>Setting the message listener to null is the equivalent of ! * unsetting the message listener for the message consumer. ! * ! * <P>Calling the setMessageListener method of MessageConsumer ! * while messages are being consumed by an existing listener ! * or the consumer is being used to synchronously consume messages ! * is undefined. ! * ! * @param messageListener the messages are delivered to this listener ! */ public void setMessageListener( MessageListener listener ) ! throws JMSException { ! messageListener = listener; } /** ! * Receive the next message produced for this message consumer. ! * ! * <P>This call blocks indefinitely until a message is produced. ! * ! * <P>If this receive is done within a transaction, the message ! * remains on the consumer until the transaction commits. ! * ! * @exception JMSException if JMS fails to receive the next ! * message due to some error. ! */ public javax.jms.Message receive() ! throws JMSException { ! if(isClosing() || ! session.isClosing()) ! { ! throw new javax.jms.IllegalStateException("Subscriber is closed"); ! } ! ! try { ! javax.jms.Message m = (javax.jms.Message)receiveQueue.take(); ! return m; ! } catch(InterruptedException ie) { ! throw new JMSException(ie.toString()); ! } } /** ! * Receive the next message that arrives within the specified ! * timeout interval. ! * ! * <P>This call blocks until either a message arrives or the ! * timeout expires. ! * ! * @param timeout the timeout value (in milliseconds) ! * ! */ public javax.jms.Message receive( long timeout ) throws JMSException { ! if(isClosing() || ! session.isClosing()) ! { ! throw new javax.jms.IllegalStateException("Subscriber is closed"); ! } ! ! try { ! javax.jms.Message m = (javax.jms.Message)receiveQueue.poll(timeout); ! return m; ! } catch(InterruptedException ie) { ! throw new JMSException(ie.toString()); ! } } /** ! * Receive the next message if one is immediately available. ! * ! * @exception JMSException if JMS fails to receive the next ! * message due to some error. ! * @return the next message produced for this message consumer, or ! * null if one is not available. ! */ public javax.jms.Message receiveNoWait() throws JMSException { ! if(isClosing() || ! session.isClosing() ) ! { ! throw new javax.jms.IllegalStateException("Subscriber is closed"); ! } ! ! javax.jms.Message m = (javax.jms.Message)receiveQueue.peek(); ! return m; } /** ! * Since a provider may allocate some resources on behalf of a ! * MessageConsumer outside the JVM, clients should close them when they ! * are not needed. Relying on garbage collection to eventually reclaim ! * these resources may not be timely enough. ! * ! * <P>This call blocks until a receive or message listener in progress ! * has completed. A blocked message consumer receive call ! * returns null when this message consumer is closed. ! * ! * @exception JMSException if JMS fails to close the consumer ! * due to some error. ! */ public void close() throws JMSException { ! isClosing = true; ! if (durable) ! { ! proc.durableGoingAway(this.name); ! } } boolean isClosing() { ! return isClosing; } /** ! * IDatagramEndpoint method ! */ public void deliver(IDatagram d) { ! // wrap the datagram with a javax.jms.message ! // and queue it in our subscriber buffer ! IMessageDatagram md = (IMessageDatagram)d; ! javax.jms.Message msg = LocalMessage.getMessage(md, this); ! ! // do acknowledgement if we are auto ack mode. ! if (session.ackMode == TopicSession.AUTO_ACKNOWLEDGE) ! internalAcknowledge(md); ! ! // if this is a local delivery, check our ! // special flag and possibly ignore ! if (noLocal && ! session.conn.isSenderLocal(md.getSenderId())) ! { ! return; ! } ! ! // otherwise we forward it using our gap/dup detector. ! delivery.deliver(md.getSenderId(), ! md.getSequence(), ! msg, ! this); } private void internalAcknowledge(IMessageDatagram md) { ! if (durable) { ! session.conn.output(new AckDatagram(md.getMessageId(), false), ! new ExponentialBackoff()); ! } else { ! // we don't use acknolwedgements for non durable subscribers ! // because it is not meaningful ! } } public void acknowledge(IMessageDatagram md) { ! if (session.ackMode == TopicSession.AUTO_ACKNOWLEDGE) { ! // noop because we have already done this for the client ! } else if (session.ackMode == TopicSession.CLIENT_ACKNOWLEDGE) { ! internalAcknowledge(md); ! } else if (session.ackMode == TopicSession.DUPS_OK_ACKNOWLEDGE) { ! // never ack here. ! } } /** ! * This method is called by the managed forwarder to actually ! * enqueue the message for delivery. ! */ public void sendMessage(javax.jms.Message msg) { ! try { ! if (messageListener != null) ! session.asyncDelivery(msg, messageListener); ! else ! receiveQueue.put(msg); ! } catch(Exception ie) { ! com.ubermq.Utility.getLogger().throwing("", "", ie); ! } } synchronized void pause() { ! if (!isPaused) ! { ! isPaused = true; ! ! // accumulate the list of things ! pausedQueue = new ArrayList(BOUNDED_BUFFER_SIZE); ! ! Object o; ! while((o = receiveQueue.peek()) != null) ! { ! pausedQueue.add(o); ! } ! } } synchronized void resume() { ! if (isPaused) ! { ! isPaused = false; ! ! Iterator iter = pausedQueue.iterator(); ! while (iter.hasNext()) ! { ! try ! { ! receiveQueue.offer(iter.next(), 0); ! } ! catch (InterruptedException e) {} ! iter.remove(); ! } ! } } } |