From: Juergen H. <jho...@us...> - 2008-10-14 14:09:30
|
Update of /cvsroot/springframework/spring/src/org/springframework/jms/listener In directory fdv4jf1.ch3.sourceforge.com:/tmp/cvs-serv7091/src/org/springframework/jms/listener Modified Files: DefaultMessageListenerContainer.java AbstractJmsListeningContainer.java Log Message: DefaultMessageListenerContainer supports a stop notification callback for unlimited maxMessagesPerTask as well now (revised wait-while-not-running behavior accordingly) Index: DefaultMessageListenerContainer.java =================================================================== RCS file: /cvsroot/springframework/spring/src/org/springframework/jms/listener/DefaultMessageListenerContainer.java,v retrieving revision 1.49 retrieving revision 1.50 diff -C2 -d -r1.49 -r1.50 *** DefaultMessageListenerContainer.java 17 Jul 2008 07:46:27 -0000 1.49 --- DefaultMessageListenerContainer.java 14 Oct 2008 14:09:19 -0000 1.50 *************** *** 874,883 **** try { if (maxMessagesPerTask < 0) { ! while (isActive()) { ! waitWhileNotRunning(); ! if (isActive()) { ! messageReceived = invokeListener(); ! } ! } } else { --- 874,878 ---- try { if (maxMessagesPerTask < 0) { ! messageReceived = executeOngoingLoop(); } else { *************** *** 913,921 **** } synchronized (lifecycleMonitor) { ! activeInvokerCount--; ! if (stopCallback != null && activeInvokerCount == 0) { ! stopCallback.run(); ! stopCallback = null; ! } lifecycleMonitor.notifyAll(); } --- 908,912 ---- } synchronized (lifecycleMonitor) { ! decreaseActiveInvokerCount(); lifecycleMonitor.notifyAll(); } *************** *** 951,954 **** --- 942,981 ---- } + private boolean executeOngoingLoop() throws JMSException { + boolean messageReceived = false; + boolean active = true; + while (active) { + synchronized (lifecycleMonitor) { + boolean interrupted = false; + boolean wasWaiting = false; + while ((active = isActive()) && !isRunning()) { + if (interrupted) { + throw new IllegalStateException("Thread was interrupted while waiting for " + + "a restart of the listener container, but container is still stopped"); + } + if (!wasWaiting) { + decreaseActiveInvokerCount(); + } + wasWaiting = true; + try { + lifecycleMonitor.wait(); + } + catch (InterruptedException ex) { + // Re-interrupt current thread, to allow other threads to react. + Thread.currentThread().interrupt(); + interrupted = true; + } + } + if (wasWaiting) { + activeInvokerCount++; + } + } + if (active) { + messageReceived = (invokeListener() || messageReceived); + } + } + return messageReceived; + } + private boolean invokeListener() throws JMSException { initResourcesIfNecessary(); *************** *** 958,961 **** --- 985,996 ---- } + private void decreaseActiveInvokerCount() { + activeInvokerCount--; + if (stopCallback != null && activeInvokerCount == 0) { + stopCallback.run(); + stopCallback = null; + } + } + private void initResourcesIfNecessary() throws JMSException { if (getCacheLevel() <= CACHE_CONNECTION) { Index: AbstractJmsListeningContainer.java =================================================================== RCS file: /cvsroot/springframework/spring/src/org/springframework/jms/listener/AbstractJmsListeningContainer.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** AbstractJmsListeningContainer.java 16 Jul 2008 22:48:44 -0000 1.14 --- AbstractJmsListeningContainer.java 14 Oct 2008 14:09:19 -0000 1.15 *************** *** 31,35 **** import org.springframework.jms.support.JmsUtils; import org.springframework.jms.support.destination.JmsDestinationAccessor; - import org.springframework.util.Assert; import org.springframework.util.ClassUtils; --- 31,34 ---- *************** *** 329,357 **** } - /** - * Wait while this container is not running. - * <p>To be called by asynchronous tasks that want to block - * while the container is in stopped state. - */ - protected final void waitWhileNotRunning() { - synchronized (this.lifecycleMonitor) { - boolean interrupted = false; - while (this.active && !isRunning()) { - if (interrupted) { - throw new IllegalStateException("Thread was interrupted while waiting for " + - "a restart of the listener container, but container is still stopped"); - } - try { - this.lifecycleMonitor.wait(); - } - catch (InterruptedException ex) { - // Re-interrupt current thread, to allow other threads to react. - Thread.currentThread().interrupt(); - interrupted = true; - } - } - } - } - //------------------------------------------------------------------------- --- 328,331 ---- *************** *** 506,510 **** */ protected final boolean rescheduleTaskIfNecessary(Object task) { - Assert.notNull(task, "Task object must not be null"); if (this.running) { try { --- 480,483 ---- |