From: <rk...@us...> - 2011-11-22 16:45:16
|
Revision: 5743 http://jaffa.svn.sourceforge.net/jaffa/?rev=5743&view=rev Author: rkisson Date: 2011-11-22 16:45:05 +0000 (Tue, 22 Nov 2011) Log Message: ----------- Added in a plug able interface to intercept and perform an operation during the deletion of a transaction Modified Paths: -------------- branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/messaging/services/JmsBrowser.java Added Paths: ----------- branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/common/ branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/common/IMessageHandler.java Property Changed: ---------------- branches/JAFFA_5_10_RELEASE/JaffaSOA/ Property changes on: branches/JAFFA_5_10_RELEASE/JaffaSOA ___________________________________________________________________ Modified: svn:ignore - dist + dist bin .classpath .project Added: branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/common/IMessageHandler.java =================================================================== --- branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/common/IMessageHandler.java (rev 0) +++ branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/common/IMessageHandler.java 2011-11-22 16:45:05 UTC (rev 5743) @@ -0,0 +1,73 @@ +/* + * ==================================================================== + * JAFFA - Java Application Framework For All + * + * Copyright (C) 2002 JAFFA Development Group + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Redistribution and use of this software and associated documentation ("Software"), + * with or without modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain copyright statements and notices. + * Redistributions must also contain a copy of this document. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name "JAFFA" must not be used to endorse or promote products derived from + * this Software without prior written permission. For written permission, + * please contact mail to: jaf...@ya.... + * 4. Products derived from this Software may not be called "JAFFA" nor may "JAFFA" + * appear in their names without prior written permission. + * 5. Due credit should be given to the JAFFA Project (http://jaffa.sourceforge.net). + * + * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR + * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * ==================================================================== + */ + +package org.jaffa.modules.common; + +import java.util.Map; + +import org.jaffa.exceptions.ApplicationExceptions; +import org.jaffa.exceptions.FrameworkException; +import org.jaffa.persistence.UOW; + +/** A interface to plug in to events of a message. + */ +public interface IMessageHandler { + + + /** + * Provides a plug able method during the deletion of a message + * @param uow + * @param messageHeaders + * @param payload + * @throws ApplicationExceptions + * @throws FrameworkException + */ + public void onDelete(UOW uow, Map<String,String> messageHeaders, Object payload) throws ApplicationExceptions, FrameworkException; + +} Modified: branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/messaging/services/JmsBrowser.java =================================================================== --- branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/messaging/services/JmsBrowser.java 2011-11-22 16:42:47 UTC (rev 5742) +++ branches/JAFFA_5_10_RELEASE/JaffaSOA/source/java/org/jaffa/modules/messaging/services/JmsBrowser.java 2011-11-22 16:45:05 UTC (rev 5743) @@ -49,9 +49,13 @@ package org.jaffa.modules.messaging.services; +import java.lang.reflect.Method; import java.util.Enumeration; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; + import javax.jms.Connection; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -59,12 +63,18 @@ import javax.jms.MessageConsumer; import javax.jms.QueueBrowser; import javax.jms.Session; +import javax.jms.TextMessage; + import org.apache.log4j.Logger; import org.jaffa.exceptions.ApplicationExceptions; import org.jaffa.exceptions.FrameworkException; -import org.jaffa.modules.messaging.services.JaffaMessagingApplicationException; +import org.jaffa.modules.common.IMessageHandler; +import org.jaffa.modules.messaging.services.configdomain.MessageInfo; +import org.jaffa.modules.messaging.services.configdomain.Param; import org.jaffa.modules.messaging.services.configdomain.QueueInfo; +import org.jaffa.persistence.UOW; import org.jaffa.security.SecurityManager; +import org.jaffa.util.JAXBHelper; /** A utility class for browsing queues. */ @@ -401,6 +411,8 @@ if (!hasAdminMessageAccess(originalQueueName)) throw new ApplicationExceptions(new JaffaMessagingApplicationException(JaffaMessagingApplicationException.NO_ADMIN_MESSAGE_ACESSS, new Object[] {originalQueueName})); + //Invoke Message Handler class to perform onDelete process + invokeHandler(message,"onDelete"); // Obtain a Connection with the JMS provider Connection connection = JmsClientHelper.obtainConnection(); @@ -427,7 +439,91 @@ } } } + /** + * Invokes the intended handler. This Handler must implement the IMessageHandler Interface in order to be invoked. + * + * @param message + * @param methodName + * @throws JaffaMessagingFrameworkException + */ + private static void invokeHandler(Message message, String methodName) throws JaffaMessagingFrameworkException { + + UOW uow = null; + try { + // Reads the MessageConfig from the ConfigurationService, based on the dataBeanClassName + String dataBeanClassName = message.getStringProperty(JmsBrowser.HEADER_DATA_BEAN_CLASS_NAME); + MessageInfo messageInfo = ConfigurationService.getInstance().getMessageInfo(dataBeanClassName); + if (messageInfo == null) + throw new JaffaMessagingFrameworkException(JaffaMessagingFrameworkException.MESSAGE_INFO_MISSING, new Object[]{dataBeanClassName}); + + // Obtain the handlerClass + Class handlerClass = Class.forName(messageInfo.getToClass()); + if(IMessageHandler.class.isAssignableFrom(handlerClass)) { + // Unmarshals the Message payload into a dataBean using the dataBeanClassName + Object payload = JAXBHelper.unmarshalPayload(obtainMessageContents(message), dataBeanClassName); + if (log.isDebugEnabled()) + log.debug("Obtained payload for the dataBean " + dataBeanClassName + '\n' + payload); + + Method handlerMethod = null; + Object handlerObject = null; + // Obtain the handler method + try { + handlerMethod = handlerClass.getMethod(methodName, new Class[]{UOW.class,Map.class, Object.class}); + } catch (NoSuchMethodException e) { + // Note that the payload could be a subclass of the argument that the handler method expects + // Hence use the dataBeanClass specified in the messageInfo to get the appropriate handlerMethod + if (log.isDebugEnabled()) + log.debug(methodName + " method not found in " + handlerClass.getName()); + + return; + } + handlerObject = handlerClass.newInstance(); + uow = new UOW(); + + Map<String,String> headerMap = new HashMap<String,String>(); + // Sets the header elements as defined in the configuration file. + if (messageInfo.getHeader() != null && messageInfo.getHeader(). + getParam() != null) { + for (Param param : messageInfo.getHeader().getParam()) { + String headerValue = (String)message.getObjectProperty(param.getName()); + headerMap.put(param.getName(), headerValue); + } + } + handlerMethod.invoke(handlerObject, new Object[]{uow,headerMap,payload}); + uow.commit(); + } + } + catch (Exception e) { + // Just log the error + log.error("Exception thrown while deleting the message. Message was: " + message, e); + throw new JaffaMessagingFrameworkException(JaffaMessagingFrameworkException.DELETE_ERROR, null, e); + } finally { + if(uow!=null) { + try { + uow.rollback(); + } catch(Exception e) { + throw new JaffaMessagingFrameworkException(JaffaMessagingFrameworkException.DELETE_ERROR, null, e); + } + } + } + + } + + /** Obtains the contents of the input message. + * This currently supports a TextMessage only. + * A null will be returned for all other message types. + * @param aMessage the Message. + * @throws JMSException if any error occurs in reading the input message. + * @return the contents of the input message. + */ + private static String obtainMessageContents(Message aMessage) throws JMSException { + String contents = null; + if (aMessage instanceof TextMessage) + contents = ((TextMessage) aMessage).getText(); + return contents; + } + /** Resubmits the message, which is currently in the error queue. * It is achieved by creating a temporary consumer for the original message to delete it, * and by sending a copy of the original message to the original queue. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |