Update of /cvsroot/jaffa/JaffaComponentsMessaging/source/java/org/jaffa/modules/messaging/services In directory sc8-pr-cvs6.sourceforge.net:/tmp/cvs-serv24103/source/java/org/jaffa/modules/messaging/services Modified Files: JaffaMessageBean.java JmsBrowser.java Added Files: IManageableMessageHandler.java ManageableMessageHandlerService.java Log Message: Created an IManageableMessageHandler interface. If a message handler implements this interface, then it can be terminated while in-process. --- NEW FILE: ManageableMessageHandlerService.java --- /* * ==================================================================== * JAFFA - Java Application Framework For All * * Copyright (C) 2002 JAFFA Development Group * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Redistribution and use of this software and associated documentation ("Software"), * with or without modification, are permitted provided that the following conditions are met: * 1. Redistributions of source code must retain copyright statements and notices. * Redistributions must also contain a copy of this document. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name "JAFFA" must not be used to endorse or promote products derived from * this Software without prior written permission. For written permission, * please contact mail to: jaf...@ya.... * 4. Products derived from this Software may not be called "JAFFA" nor may "JAFFA" * appear in their names without prior written permission. * 5. Due credit should be given to the JAFFA Project (http://jaffa.sourceforge.net). * * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== */ package org.jaffa.modules.messaging.services; import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; import org.jaffa.exceptions.ApplicationExceptions; import org.jaffa.exceptions.FrameworkException; /** This class maintains a cache of IManageableMessageHandler instances keyed by the messageId. * It can be used to manage message handlers. * NOTE: This service may not work when the handlers are invoked by a cluster of MDBs. */ public class ManageableMessageHandlerService { private static final Logger log = Logger.getLogger(ManageableMessageHandlerService.class); private static Map<String, IManageableMessageHandler> c_cache = new HashMap<String, IManageableMessageHandler>(); /** Caches the input message handler. * @param messageId the message Id. * @param messageHandler the message handler */ public synchronized static void addManageableMessageHandler(String messageId, IManageableMessageHandler messageHandler) { c_cache.put(messageId, messageHandler); } /** Returns the message handler for the given messageId. * A null is returned in case a message handler was not added, or if it has been removed. * @param messageId the message Id. * @return the message handler for the given messageId. */ public static IManageableMessageHandler findManageableMessageHandler(String messageId) { return c_cache.get(messageId); } /** Removes the message handler for the given messageId, from the cache. * @param messageId the message Id. * @return the message handler that has been removed. */ public synchronized static IManageableMessageHandler removeManageableMessageHandler(String messageId) { return c_cache.remove(messageId); } /** This method will be invoked to terminate a long running message-handling process. * @param messageId the message Id. * @throws FrameworkException in case any internal error occurs. * @throws ApplicationExceptions Indicates application error(s). */ public static void interrupt(String messageId) throws FrameworkException, ApplicationExceptions { if (log.isDebugEnabled()) log.debug("Interrupting the handler for the message " + messageId); // find the message handler IManageableMessageHandler messageHandler = findManageableMessageHandler(messageId); if (messageHandler != null) { // interrupt the message handler messageHandler.interrupt(); if (log.isDebugEnabled()) log.debug("Interrupted the handler for the message " + messageId); // uncache the message handler removeManageableMessageHandler(messageId); } else { if (log.isDebugEnabled()) log.debug("Handler not found for the message " + messageId); } } } --- NEW FILE: IManageableMessageHandler.java --- /* * ==================================================================== * JAFFA - Java Application Framework For All * * Copyright (C) 2002 JAFFA Development Group * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Redistribution and use of this software and associated documentation ("Software"), * with or without modification, are permitted provided that the following conditions are met: * 1. Redistributions of source code must retain copyright statements and notices. * Redistributions must also contain a copy of this document. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name "JAFFA" must not be used to endorse or promote products derived from * this Software without prior written permission. For written permission, * please contact mail to: jaf...@ya.... * 4. Products derived from this Software may not be called "JAFFA" nor may "JAFFA" * appear in their names without prior written permission. * 5. Due credit should be given to the JAFFA Project (http://jaffa.sourceforge.net). * * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== */ package org.jaffa.modules.messaging.services; import org.jaffa.exceptions.ApplicationExceptions; import org.jaffa.exceptions.FrameworkException; /** If a dataBean handler implements this interface, then it'll be possible to terminate * long running message-handling processes through the invocation of the interrupt() method. */ public interface IManageableMessageHandler { /** This method will be invoked to terminate a long running message-handling process. * @throws FrameworkException in case any internal error occurs. * @throws ApplicationExceptions Indicates application error(s). */ public void interrupt() throws FrameworkException, ApplicationExceptions; } Index: JaffaMessageBean.java =================================================================== RCS file: /cvsroot/jaffa/JaffaComponentsMessaging/source/java/org/jaffa/modules/messaging/services/JaffaMessageBean.java,v retrieving revision 1.17 retrieving revision 1.18 diff -C2 -d -r1.17 -r1.18 *** JaffaMessageBean.java 9 May 2007 02:07:16 -0000 1.17 --- JaffaMessageBean.java 20 Jun 2007 16:18:18 -0000 1.18 *************** *** 194,198 **** // Invokes the handler as specified by the 'toClass and toMethod' combination in the MessageConfig ! invokeHandler(messageInfo, payload); if (log.isInfoEnabled()) --- 194,198 ---- // Invokes the handler as specified by the 'toClass and toMethod' combination in the MessageConfig ! invokeHandler(aMessage != null ? aMessage.getJMSMessageID() : null, messageInfo, payload); if (log.isInfoEnabled()) *************** *** 240,243 **** --- 240,244 ---- /** Invokes the handler to process the payload. + * @param messageId the message Id. * @param messageInfo the messageInfo that contains the handler information. * @param payload the payload to be processed. *************** *** 248,252 **** * @throws InvocationTargetException if any error occurs while invoking the handler. */ ! private static void invokeHandler(MessageInfo messageInfo, Object payload) throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { if (messageInfo.getToClass() == null || messageInfo.getToClass().length() == 0 || messageInfo.getToMethod() == null || messageInfo.getToMethod().length() == 0) { --- 249,253 ---- * @throws InvocationTargetException if any error occurs while invoking the handler. */ ! private static void invokeHandler(String messageId, MessageInfo messageInfo, Object payload) throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { if (messageInfo.getToClass() == null || messageInfo.getToClass().length() == 0 || messageInfo.getToMethod() == null || messageInfo.getToMethod().length() == 0) { *************** *** 272,283 **** // Create an instance of handlerClass, if the handlerMethod is not static Object handlerObject = null; ! if (!Modifier.isStatic(handlerMethod.getModifiers())) handlerObject = handlerClass.newInstance(); ! // Invoke the handler ! if (log.isDebugEnabled()) ! log.debug("Invoking the handler " + handlerMethod); ! handlerMethod.invoke(handlerObject, new Object[] {payload}); } } --- 273,301 ---- // Create an instance of handlerClass, if the handlerMethod is not static + // or if the handlerClass implements the IManageableMessageHandler interface Object handlerObject = null; ! if (!Modifier.isStatic(handlerMethod.getModifiers()) ! || (messageId != null && IManageableMessageHandler.class.isAssignableFrom(handlerClass))) handlerObject = handlerClass.newInstance(); ! // Cache the handler if it implements the IManageableMessageHandler interface ! if (messageId != null && IManageableMessageHandler.class.isAssignableFrom(handlerClass)) { ! if (log.isDebugEnabled()) ! log.debug("Caching the message handler instance " + handlerObject); ! ManageableMessageHandlerService.addManageableMessageHandler(messageId, (IManageableMessageHandler) handlerObject); ! } ! ! try { ! // Invoke the handler ! if (log.isDebugEnabled()) ! log.debug("Invoking the handler " + handlerMethod); ! handlerMethod.invoke(handlerObject, new Object[] {payload}); ! } finally { ! if (messageId != null && IManageableMessageHandler.class.isAssignableFrom(handlerClass)) { ! if (log.isDebugEnabled()) ! log.debug("Uncaching the message handler instance " + handlerObject); ! ManageableMessageHandlerService.removeManageableMessageHandler(messageId); ! } ! } } } Index: JmsBrowser.java =================================================================== RCS file: /cvsroot/jaffa/JaffaComponentsMessaging/source/java/org/jaffa/modules/messaging/services/JmsBrowser.java,v retrieving revision 1.21 retrieving revision 1.22 diff -C2 -d -r1.21 -r1.22 *** JmsBrowser.java 19 Jun 2007 10:57:44 -0000 1.21 --- JmsBrowser.java 20 Jun 2007 16:18:18 -0000 1.22 *************** *** 597,603 **** throw new ApplicationExceptions(new JaffaMessagingApplicationException(JaffaMessagingApplicationException.NO_ADMIN_MESSAGE_ACESSS, new Object[] {originalQueueName})); ! //@todo ! //if (log.isDebugEnabled()) ! // log.debug("Killed in-process message " + messageId); } catch (JMSException e) { log.error("Error in deleting a JMS Message", e); --- 597,602 ---- throw new ApplicationExceptions(new JaffaMessagingApplicationException(JaffaMessagingApplicationException.NO_ADMIN_MESSAGE_ACESSS, new Object[] {originalQueueName})); ! // Interrupt the process ! ManageableMessageHandlerService.interrupt(messageId); } catch (JMSException e) { log.error("Error in deleting a JMS Message", e); |