From: Timothy F. <ti...@jb...> - 2006-03-23 19:24:03
|
User: timfox Date: 06/03/23 14:23:57 Modified: src/main/org/jboss/jms/client/remoting MessageCallbackHandler.java Log: Multiple fixes, mainly race conditions. Refactoring of message callback handler Server now runs without error under extreme stress and paging Revision Changes Path 1.61 +425 -261 jboss-jms/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java (In the diff below, changes in quantity of whitespace are not shown.) Index: MessageCallbackHandler.java =================================================================== RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java,v retrieving revision 1.60 retrieving revision 1.61 diff -u -b -r1.60 -r1.61 --- MessageCallbackHandler.java 22 Mar 2006 10:23:32 -0000 1.60 +++ MessageCallbackHandler.java 23 Mar 2006 19:23:56 -0000 1.61 @@ -21,6 +21,11 @@ */ package org.jboss.jms.client.remoting; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + import javax.jms.JMSException; import javax.jms.MessageListener; import javax.jms.Session; @@ -28,20 +33,18 @@ import org.jboss.jms.delegate.ConsumerDelegate; import org.jboss.jms.delegate.SessionDelegate; import org.jboss.jms.message.MessageProxy; -import org.jboss.jms.util.JBossJMSException; import org.jboss.logging.Logger; import org.jboss.remoting.callback.HandleCallbackException; -import EDU.oswego.cs.dl.util.concurrent.Executor; -import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; -import EDU.oswego.cs.dl.util.concurrent.SynchronousChannel; +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; +import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor; /** * @author <a href="mailto:ov...@jb...">Ovidiu Feodorov</a> * @author <a href="mailto:ti...@jb...">Tim Fox/a> - * @version <tt>$Revision: 1.60 $</tt> + * @version <tt>$Revision: 1.61 $</tt> * - * $Id: MessageCallbackHandler.java,v 1.60 2006/03/22 10:23:32 timfox Exp $ + * $Id: MessageCallbackHandler.java,v 1.61 2006/03/23 19:23:56 timfox Exp $ */ public class MessageCallbackHandler { @@ -85,7 +88,6 @@ if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE) { //Cancel the message - this means it will be immediately redelivered - if (trace) { log.trace("cancelling " + id); } cons.cancelMessage(id); } @@ -98,7 +100,6 @@ } postDeliver(sess, consumerID, m, isConnectionConsumer); - } protected static void preDeliver(SessionDelegate sess, @@ -131,7 +132,7 @@ // Attributes ---------------------------------------------------- - protected SynchronousChannel buffer; + protected LinkedList buffer; protected SessionDelegate sessionDelegate; @@ -141,166 +142,270 @@ protected boolean isConnectionConsumer; - protected boolean closed; - protected volatile Thread receiverThread; - protected Object receiverLock; - - protected volatile MessageListener listener; - - protected volatile boolean stopping; + protected MessageListener listener; protected int deliveryAttempts; protected int ackMode; - protected Object activationLock; - // Executor used for executing onMessage methods - there is one per session - protected Executor onMessageExecutor; + protected QueuedExecutor onMessageExecutor; // Executor for executing activateConsumer methods asynchronously, there is one pool per connection - protected Executor pooledExecutor; + protected PooledExecutor activateConsumerExecutor; + + protected Object mainLock; + + protected Object activationLock; + + protected Object onMessageLock; - // We need to keep track of how many calls to activate we have made so when we close the consumer - // we can wait for the last one to complete otherwise we can end up with closing the consumer and - // then a call to activate occurs causing an exception - protected SynchronizedInt activationCount; + protected boolean closed; + + protected boolean closing; + + protected boolean gotLastMessage; + + //The id of the last message we received + protected long lastMessageId = -1; + protected volatile int activationCount; + + protected volatile boolean onMessageExecuting; // Constructors -------------------------------------------------- - public MessageCallbackHandler(boolean isCC, int ackMode, Executor executor, Executor pooledExecutor, + public MessageCallbackHandler(boolean isCC, int ackMode, QueuedExecutor onMessageExecutor, + PooledExecutor activateConsumerExecutor, SessionDelegate sess, ConsumerDelegate cons, int consumerID) { - buffer = new SynchronousChannel(); + buffer = new LinkedList(); isConnectionConsumer = isCC; - receiverLock = new Object(); - - activationLock = new Object(); - this.ackMode = ackMode; - this.onMessageExecutor = executor; - - this.pooledExecutor = pooledExecutor; + this.onMessageExecutor = onMessageExecutor; - activationCount = new SynchronizedInt(0); + this.activateConsumerExecutor = activateConsumerExecutor; this.sessionDelegate = sess; this.consumerDelegate = cons; this.consumerID = consumerID; + + mainLock = new Object(); + + activationLock = new Object(); + + onMessageLock = new Object(); } - public synchronized void handleMessage(MessageProxy md) throws HandleCallbackException + // Public -------------------------------------------------------- + + public void handleMessage(MessageProxy md) throws HandleCallbackException { if (trace) { log.trace("receiving message " + md + " from the remoting layer"); } + md = processMessage(md); + + synchronized (mainLock) + { if (closed) { - log.warn("Consumer is closed - ignoring message"); + //Sanity check + //This should never happen + //Part of the close procedure is to ensure that no more messages will be sent + //If this happens it implies the close() procedure is not functioning correctly + throw new IllegalStateException("Message has arrived after consumer is closed!"); + } - // Note - we do not cancel the message if the handler is closed. If the handler is closed - // then the corresponding server consumer endpoint is either already closed or about to - // close, in which case its deliveries will be cancelled anyway. - return; + if (closing && gotLastMessage) + { + //Sanity check - this should never happen + //No messages should arrive after the last one sent by the server consumer endpoint + throw new IllegalStateException("Message has arrived after we have received the last one"); } + //We record the last message we received + this.lastMessageId = md.getMessage().getMessageID(); + if (listener != null) { //Queue the message to be delivered by the session - ClientDeliveryRunnable cdr = new ClientDeliveryRunnable(this, processMessage(md)); + ClientDeliveryRunnable cdr = new ClientDeliveryRunnable(md); + + onMessageExecuting = true; + try { onMessageExecutor.execute(cdr); } catch (InterruptedException e) { - log.error("Thread was interrupted, cancelling message", e); - cancelMessage(md); + //This should never happen + throw new IllegalStateException("Thread interrupted in client delivery executor"); } } else { - try - { - //We attempt to put the message in the Channel - //The call will return false if the message is not picked up the receiving - //thread in which case we need to cancel it - - //channel.offer will *only* return true if there is a thread waiting to take a - //message using take() or poll() hence we can guarantee there is no chance any - //messages can arrive and are left in the channel without being handled - this is - //why we use a SynchronousChannel :) + //Put the message in the buffer + //And notify any waiting receive() + //On close any remaining messages will be cancelled + //We do not wait for the message to be received before returning - boolean handled = buffer.offer(processMessage(md), 10000); + buffer.add(md); + } - if (!handled) - { - //There is no-one waiting for our message so we cancel it - if (!closed) + if (closing) { - cancelMessage(md); + //If closing then we may have the close() thread waiting for the last message as well as a receive + //thread + mainLock.notifyAll(); } + else + { + //Otherwise we will only have at most one receive thread waiting + //We don't want to do notifyAll in both cases since notifyAll can have a perf penalty + if (receiverThread != null) + { + mainLock.notify(); } } - catch(InterruptedException e) - { - String msg = "Interrupted attempt to put message in the delivery buffer"; - log.error(msg); - throw new HandleCallbackException(msg, e); } } + public void setMessageListener(MessageListener listener) throws JMSException + { + synchronized (mainLock) + { + if (closed) + { + throw new JMSException("Cannot set MessageListener - consumer is closed"); } - - // Public -------------------------------------------------------- - - public synchronized void setMessageListener(MessageListener listener) throws JMSException - { // JMS consumer is single threaded, so it shouldn't be possible to set a MessageListener // while another thread is receiving if (receiverThread != null) { // Should never happen - throw new javax.jms.IllegalStateException("Another thread is already receiving"); + throw new javax.jms.IllegalStateException("Consumer is currently in receive(..) Cannot set MessageListener"); } + synchronized (onMessageLock) + { this.listener = listener; + } if (trace) { log.trace("installed listener " + listener); } activateConsumer(); } + } + + public void close() throws JMSException + { + try + { + synchronized (mainLock) + { + if (closed) + { + return; + } + + closing = true; + + //We wait for any activation in progress to complete and the resulting message + //(if any) to be returned and processed. + //The ensures a clean, gracefully closure of the client side consumer, without + //any messages in transit which might arrive after the consumer is closed and which + //subsequently might be cancelled out of sequence causing message ordering problems + waitForActivationsToComplete(); + //Now we know there are no activations in progress but the consumer may still be active so we call + //deactivate which returns the id of the last message we should have received + //if we have received this message then we know there is no possibility of any message still in + //transit and we can close down with confidence + //otherwise we wait for this message and timeout if it doesn't arrive which might be the case + //if the connection to the server has been lost - public synchronized void close() + //TODO Make configurable + final int TIMEOUT = 20000; + + long lastMessageIDToExpect = deactivateConsumer(); + + if (lastMessageIDToExpect != -1) { - closed = true; + long waitTime = TIMEOUT; - stopReceiver(); + while (lastMessageIDToExpect != lastMessageId && waitTime > 0) + { + waitTime = waitOnLock(mainLock, waitTime); + } - //There may still be pending calls to activateConsumer - we wait for them to complete (or fail) - synchronized (activationLock) + if (lastMessageIDToExpect != lastMessageId) { - while (activationCount.get() != 0) + log.warn("Timed out waiting for last message to arrive, last=" + lastMessageId +" expected=" + lastMessageIDToExpect); + } + } + + //We set this even if we timed out waiting since we do not want any more to arrive now + gotLastMessage = true; + + //Wake up any receive() thread that might be waiting + mainLock.notify(); + + //Now make sure that any onMessage of a listener has finished executing + + long waitTime = TIMEOUT; + + synchronized (onMessageLock) { - try + while (onMessageExecuting && waitTime > 0) { - activationLock.wait(); + waitTime = waitOnLock(onMessageLock, waitTime); } - catch (InterruptedException e) + if (onMessageExecuting) { - log.error("Thread interrupted", e); + //Timed out waiting for last onMessage to be processed + log.warn("Timed out waiting for last onMessage to be executed"); } } + + //Now we know that all messages have been received and processed + + if (!buffer.isEmpty()) + { + //Now we cancel any deliveries that might be waiting in our buffer + Iterator iter = buffer.iterator(); + + List ids = new ArrayList(); + while (iter.hasNext()) + { + MessageProxy mp = (MessageProxy)iter.next(); + + ids.add(new Long(mp.getMessage().getMessageID())); + } + cancelMessages(ids); + } + + //Now we are done + listener = null; + + receiverThread = null; + + closed = true; + } + } + catch (InterruptedException e) + { + //No one should be interrupting the thread so this shouldn't occur + throw new IllegalStateException("Thread interrupted while closing consumer"); } } @@ -314,9 +419,16 @@ */ public MessageProxy receive(long timeout) throws JMSException { + synchronized (mainLock) + { + if (closed) + { + throw new JMSException("Cannot call receive(..) Consumer is closed"); + } + if (listener != null) { - throw new JBossJMSException("A message listener is already registered"); + throw new JMSException("The consumer has a MessageListener set, cannot call receive(..)"); } receiverThread = Thread.currentThread(); @@ -329,16 +441,11 @@ { while(true) { - try - { if (timeout == 0) { if (trace) log.trace("receive with no timeout"); - if (!stopping) - { m = getMessage(0); - } if (m == null) { @@ -352,10 +459,7 @@ //ReceiveNoWait if (trace) { log.trace("receive noWait"); } - if (!stopping) - { m = getMessage(-1); - } if (m == null) { @@ -367,10 +471,7 @@ { if (trace) { log.trace("receive timeout " + timeout + " ms, blocking poll on queue"); } - if (!stopping) - { m = getMessage(timeout); - } if (m == null) { @@ -380,20 +481,6 @@ return null; } } - } - catch(InterruptedException e) - { - if (trace) { log.trace("Thread was interrupted"); } - - if (closed) - { - return null; - } - else - { - throw new JMSException("Interrupted"); - } - } if (trace) { log.trace("got " + m); } @@ -410,24 +497,32 @@ log.debug("message expired, discarding"); - // discard the message, adjust timeout and reenter the buffer + // the message expired, so discard the message, adjust timeout and reenter the buffer if (timeout != 0) { timeout -= System.currentTimeMillis() - startTimestamp; } + + if (closing) + { + return null; + } } } finally { receiverThread = null; } - + } } - public synchronized MessageListener getMessageListener() + public MessageListener getMessageListener() + { + synchronized (onMessageLock) { return listener; } + } public String toString() { @@ -438,42 +533,57 @@ // Protected ----------------------------------------------------- - protected void cancelMessage(MessageProxy m) + protected void waitForActivationsToComplete() + { + synchronized (activationLock) + { + while (activationCount > 0) { try { - consumerDelegate.cancelMessage(m.getMessage().getMessageID()); + activationLock.wait(); } - catch (Exception e) + catch (InterruptedException e) { - //It may be ok that we cannot cancel the message - e.g. - //if the corresponding server consumer delegate is already closed - //in which case the deliveries will be cancelled anyway - String msg = "Failed to cancel message"; - log.warn(msg, e); + //This should never occur + throw new IllegalStateException("Thread interrupted in waiting for activation count to reach zero"); + } + } } } - protected void stopReceiver() + protected long waitOnLock(Object lock, long waitTime) throws InterruptedException { - synchronized (receiverLock) + long start = System.currentTimeMillis(); + + //Wait for last message to arrive + lock.wait(waitTime); + + long waited = System.currentTimeMillis() - start; + + if (waited < waitTime) { - stopping = true; + waitTime = waitTime - waited; - //The listener loop may not be waiting, so interrupting the thread will - //not necessarily work, thus leaving it hanging - //so we use the listenerStopping variable too - if (receiverThread != null) + return waitTime; + } + else { - receiverThread.interrupt(); + return 0; } - - //FIXME - There is a possibility the receiver thread could still be waiting inside the receive() method - //after the interrupt - need to deal with this - - stopping = false; } + protected void cancelMessages(List ids) + { + try + { + consumerDelegate.cancelMessages(ids); + } + catch (Exception e) + { + String msg = "Failed to cancel messages"; + log.warn(msg, e); + } } protected void activateConsumer() throws JMSException @@ -485,25 +595,32 @@ try { if (trace) { log.trace("initiating consumer endpoint activation"); } - pooledExecutor.execute(new ConsumerActivationRunnable()); - activationCount.increment(); + activationCount++; + activateConsumerExecutor.execute(new ConsumerActivationRunnable()); } catch (InterruptedException e) { - throw new JBossJMSException("Thread interrupted", e); + //This should never happen + throw new IllegalStateException("Activation executor thread interrupted"); } } - protected void deactivateConsumer() throws JMSException + protected long deactivateConsumer() throws JMSException { - consumerDelegate.deactivate(); + return consumerDelegate.deactivate(); } protected MessageProxy getMessageNow() throws JMSException { MessageProxy del = (MessageProxy)consumerDelegate.getMessageNow(false); + if (del != null) { + //We record the id of the last message delivered + //No need to notify here since this will never be called while we + //are closing + lastMessageId = del.getMessage().getMessageID(); + return processMessage(del); } else @@ -512,7 +629,7 @@ } } - protected MessageProxy getMessage(long timeout) throws InterruptedException, JMSException + protected MessageProxy getMessage(long timeout) throws JMSException { MessageProxy m = null; @@ -525,35 +642,59 @@ { // ... otherwise we activate the server side consumer and wait for a message to arrive // asynchronously - activateConsumer(); try { if (timeout == 0) { - // Indefinite wait - m = (MessageProxy)buffer.take(); + //Wait for ever potentially + while (!closing && buffer.isEmpty()) + { + mainLock.wait(); + } } else { - // Wait with timeout - m = (MessageProxy)buffer.poll(timeout); + //Wait with timeout + long toWait = timeout; + + while (!closing && buffer.isEmpty() && toWait > 0) + { + toWait = waitOnLock(mainLock, toWait); } } - finally - { - // We only need to call this if we didn't receive a message synchronously - if (!closed) + if (closing) + { + m = null; + } + else + { + if (!buffer.isEmpty()) + { + m = (MessageProxy)buffer.removeFirst(); + } + else + { + m = null; + } + } + } + catch (InterruptedException e) { + //No one should be interrupting the thread so this shouldn't occur + throw new IllegalStateException("Thread interrupted while receiving"); + } + finally + { + // We only need to call this if we timed out if (m == null) { deactivateConsumer(); } } } - } return m; } @@ -578,24 +719,26 @@ private class ClientDeliveryRunnable implements Runnable { - private MessageCallbackHandler handler; - private MessageProxy message; - private ClientDeliveryRunnable(MessageCallbackHandler handler, MessageProxy message) + private ClientDeliveryRunnable(MessageProxy message) { - this.handler = handler; this.message = message; } public void run() { - synchronized (handler) + //We synchronize here to prevent the message listener being set with a different one + //between callOnMessage and activate being called + synchronized (onMessageLock) { - if (handler.closed) + if (closed) { - //The handler is closed - ignore the message - the serverconsumerdelegate will already be - //closed so the message will have already been cancelled + //Sanity check. This should never happen + //Part of the close procedure is to ensure there are no messages in the executor queue + //for delivery to the MessageListener + //If this happens it implies the close() procedure is not working properly + throw new IllegalStateException("Calling onMessage() but the consumer is closed!"); } else { @@ -603,8 +746,16 @@ { MessageCallbackHandler.callOnMessage(consumerDelegate, sessionDelegate, listener, consumerID, isConnectionConsumer, message, ackMode); + if (!closing) + { consumerDelegate.activate(); } + + onMessageExecuting = false; + + //The close() thread may be waiting for us to finish executing, so wake it up + onMessageLock.notify(); + } catch (JMSException e) { log.error("Failed to deliver message", e); @@ -620,12 +771,26 @@ { try { + //We always try and return the message immediately, if available. + //This prevents an extra network call to deliver the message. + //If the message is not available, the consumer will stay active and + //the message will delivered asynchronously (pushed) (that is what the boolean param is for) + MessageProxy m = (MessageProxy)consumerDelegate.getMessageNow(true); if (m != null) { handleMessage(m); } + + synchronized (activationLock) + { + activationCount--; + if (activationCount == 0) + { + activationLock.notify(); + } + } } catch(Throwable t) { @@ -634,18 +799,17 @@ { log.error("Cause:" + t.getCause()); } - stopReceiver(); - } - finally + try { - synchronized (activationLock) + close(); + } + catch (JMSException e) { - activationCount.decrement(); - activationLock.notifyAll(); + log.error("Failed to close consumer", e); } } - } } } + |