From: <jbo...@li...> - 2006-06-30 18:43:09
|
Author: estebanschifman Date: 2006-06-30 14:43:05 -0400 (Fri, 30 Jun 2006) New Revision: 4889 Added: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java Log: A new abtract class that can be extended for any "poller" e.g. Directory poller, SQL table poller, Email poller, etc See how it's used in "BetterDirListener" (that replaces DirListener) Added: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-06-30 18:40:04 UTC (rev 4888) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-06-30 18:43:05 UTC (rev 4889) @@ -0,0 +1,289 @@ +/* +* JBoss, Home of Professional Open Source +* Copyright 2006, JBoss Inc., and individual contributors as indicated +* by the @authors tag. See the copyright.txt in the distribution for a +* full listing of individual contributors. +* +* This is free software; you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as +* published by the Free Software Foundation; either version 2.1 of +* the License, or (at your option) any later version. +* +* This software is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this software; if not, write to the Free +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +* 02110-1301 USA, or see the FSF site: http://www.fsf.org. +*/ + + +package org.jboss.soa.esb.listeners; + +import java.util.*; + +import org.apache.log4j.*; + +import javax.naming.*; +import javax.jms.*; + +import org.jboss.soa.esb.util.*; +import org.jboss.soa.esb.common.*; +import org.jboss.soa.esb.helpers.*; +import org.jboss.soa.esb.parameters.*; + +public abstract class AbstractPoller +{ + + protected abstract void executeOneCycle()throws Exception; + protected abstract GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception; + + public static final String PARM_POLL_LTCY = "pollLatency"; + public static final String PARM_RELOAD_LTCY = "reloadLatency"; + + public static final String PARM_TOPIC_CONN_FACT = "topicConnFactoryClass"; + public static final String PARM_QUIESCE_TOPIC = "quiesceTopic"; + public static final String PARM_QUIESCE_SELECTOR = "quiesceSelector"; + + protected ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup(); + protected Map<String,GroupOfChilds> m_omChildPrc + = new HashMap<String,GroupOfChilds>(); + + protected ParamsRepository m_oParmRepos; + protected Name m_oParmsName; + protected Logger m_oLogger; + + protected DomElement m_oParms; + protected boolean m_bEndRequested; + + protected TopicConnection m_oTopicConn; + protected TopicSession m_oSession; + protected Topic m_oTopic; + protected TopicSubscriber m_oTopicSubs; + + + protected AbstractPoller(String p_sParamsUid) throws Exception + { + m_oLogger = EsbUtil.getDefaultLogger(this.getClass()); + + String sFactoryClass = EsbSysProps.getParamsReposFactoryClass(); + m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null); + m_oParmsName = ParamsReposUtil.nameFromString(p_sParamsUid); + } //__________________________________ + + protected void runUntilEndRequested() throws Exception + { while (! m_bEndRequested) + { try + { String sMsg = (null == m_oParms) + ? "Initial Parameter loading" : "Reloading Params"; + m_oLogger.info(formatLogMsg(sMsg)); + m_oParms = m_oParmRepos.getElement(m_oParmsName); + } + catch (Exception e) + { + StringBuilder sb = new StringBuilder ("Problems loading params ") + .append(m_oParmsName) + .append((null==m_oParms)? " exiting..." : "continuing to use cached params") + ; + m_oLogger.error(formatLogMsg(sb.toString())); + if (null==m_oParms) + throw e; + } + quiesceTopicSubscribe(); + executeOneCycle(); + } + } //__________________________________ + + protected String formatLogMsg(String p_s) + { return new StringBuilder("Processor '") + .append(EsbUtil.classSuffix(this.getClass())).append("' <") + .append(p_s).append("") + .toString(); + } //__________________________________ + + private final void quiesceTopicSubscribe() throws JMSException, NamingException + { + try + { + m_oTopicConn = null; + m_oSession = null; + m_oTopic = null; + m_oTopicSubs = null; + + String sStopTopic = m_oParms.getAttr(PARM_QUIESCE_TOPIC); + if (EsbUtil.isNullString(sStopTopic)) + return; + String sFactClass = m_oParms.getAttr(PARM_TOPIC_CONN_FACT); + if (EsbUtil.isNullString(sFactClass)) + sFactClass = "ConnectionFactory"; + + String sJndiType = EsbSysProps.getJndiServerType(); + String sJndiURL = EsbSysProps.getJndiServerURL(); + Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL); + Object tmp = oJndiCtx.lookup(sFactClass); + TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; + + m_oTopicConn = tcf.createTopicConnection(); + m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic); + m_oSession = m_oTopicConn.createTopicSession + (false,TopicSession.AUTO_ACKNOWLEDGE); + m_oTopicConn.start(); + + String sSelector = m_oParms.getAttr(PARM_QUIESCE_SELECTOR); + if (EsbUtil.isNullString(sSelector)) + sSelector = "processor='"+EsbUtil.classSuffix(this.getClass())+"'"; + m_oTopicSubs = m_oSession.createSubscriber(m_oTopic, sSelector,true); + } + catch (Exception e) + { m_oLogger.error("Problems connecting to JMS. ",e); + } + + } //_________________________________________ + + protected boolean waitForQuiesce(long p_lMillis) throws Exception + { + try + { boolean bSleep = (null== m_oTopicSubs); + Object oMsg = (bSleep) ? null : secureReceive(p_lMillis); + + if (null!=oMsg) + { m_oLogger.info("Starting Quiesce of " + +EsbUtil.classSuffix(this.getClass())); + return true; + } + if (bSleep) + Thread.sleep(p_lMillis); + return false; + + } + catch (Exception e) + { m_oLogger.error("Problems with waitForQuiesce. ",e); + Thread.sleep(p_lMillis); + return false; + } + } //_________________________________________ + + private Message secureReceive(long p_lMillis) throws Exception + { + while (true) + try + { return (null==m_oTopicSubs) ? null : m_oTopicSubs.receive(p_lMillis); } + catch (JMSException e) + { + // put here your recovery code + return null; + } + + } //_________________________________________ + + + protected void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception + { + String sPrcName = p_oP.getName(); + if (!m_omChildPrc.containsKey(sPrcName)) + { + ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()]; + int iMax = m_oThrGrp.enumerate(oaCh); + + ThreadGroup oThG = null; + for (int i1 = 0; null == oThG && i1 < iMax; i1++) + { if (m_oThrGrp.getName().equals(sPrcName)) + oThG = oaCh[i1]; + } + if (null == oThG) + oThG = new ThreadGroup(sPrcName); + m_omChildPrc.put(sPrcName, newChildGroup(oThG)); + } + GroupOfChilds oChildGrp = m_omChildPrc.get(sPrcName); + + if (null == oChildGrp) return; + if (p_bFirst) + oChildGrp.m_bError = false; + + try + { + oChildGrp.execute(p_oP); + } + catch (Exception e) + { + oChildGrp.m_bError = true; + m_oLogger.error(formatLogMsg("GroupOfChilds.execute"), e); + } + } //_________________________________________ + + protected abstract class GroupOfChilds implements Observer + { + protected abstract void doYourJob(DomElement p_oP) throws Exception; + + public static final String PARM_MAX_THREADS = "maxThreads"; + + protected ThreadGroup m_oThrGrp; + protected boolean m_bError = false; + + protected Class m_oExecClass; + protected DomElement m_oChParms; + + protected int m_iQthr = 0, m_iMaxThr; + protected StringBuilder m_sb; + protected int m_iSbIni; + + protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception + { + m_oThrGrp = p_oThrGrp; + m_sb = new StringBuilder("GroupOfThreads ") + .append(m_oThrGrp.getName()).append(" : "); + m_iSbIni = m_sb.length(); + } //________________________________ + + public void update(Observable p_oObs, Object p_oUsrObj) + { + if (p_oUsrObj instanceof Integer) + { + updQthreads( ( (Integer) p_oUsrObj).intValue()); + } + } //________________________________ + + private synchronized void updQthreads(int p_i) + { + m_iQthr += p_i; + } //________________________________ + + private void execute(DomElement p_oP) throws Exception + { + m_sb.setLength(m_iSbIni); + if (m_bError) + { + m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors") + .toString()); + return; + } + checkParms(p_oP); + doYourJob (p_oP); + } //________________________________ + + protected void setMaxThreads(DomElement p_oP,int p_iMax) + { + String sAtt = p_oP.getAttr(PARM_MAX_THREADS); + m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt); + m_iMaxThr = (m_iMaxThr < 1) ? 1 + : (m_iMaxThr > p_iMax) ? p_iMax + : m_iMaxThr; + + } //________________________________ + + // Very basic checkParms method + // Remember to call super.checkParms(p_oP) in derived checkParms() methods + // so your parameters will be cloned + // and to add REAL parameter checking + protected void checkParms(DomElement p_oP) throws Exception + { + m_sb.setLength(0); + m_oChParms = p_oP.cloneObj(); + setMaxThreads(p_oP,10); + } //________________________________ + + } //______________________________________________________ +} //____________________________________________________________________________ |