From: <jbo...@li...> - 2006-06-29 15:12:14
|
Author: estebanschifman Date: 2006-06-29 11:12:07 -0400 (Thu, 29 Jun 2006) New Revision: 4870 Added: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java Log: Recovered lost version of this listener Added: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-06-29 15:11:40 UTC (rev 4869) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-06-29 15:12:07 UTC (rev 4870) @@ -0,0 +1,410 @@ +/* +* JBoss, Home of Professional Open Source +* Copyright 2006, JBoss Inc., and individual contributors as indicated +* by the @authors tag. See the copyright.txt in the distribution for a +* full listing of individual contributors. +* +* This is free software; you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as +* published by the Free Software Foundation; either version 2.1 of +* the License, or (at your option) any later version. +* +* This software is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this software; if not, write to the Free +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +* 02110-1301 USA, or see the FSF site: http://www.fsf.org. +*/ + + +package org.jboss.soa.esb.listeners; + +import java.io.*; +import java.net.*; +import java.util.*; +import org.apache.log4j.*; +import javax.naming.*; +import javax.jms.*; + +import org.jboss.soa.esb.util.*; +import org.jboss.soa.esb.common.*; +import org.jboss.soa.esb.helpers.*; +import org.jboss.soa.esb.parameters.*; +import org.jboss.soa.esb.processors.*; + +//import org.jboss.soa.esb.nagios.*; + +public class DirListener +{ + public static void main(String[] args) throws Exception + { + new DirListener(args[0]); + } //________________________________ + + public static final String PARM_POLL_LTCY = "pollLatency"; + public static final String PARM_RELOAD_LTCY = "reloadLatency"; + + public static final String PARM_MAX_THREADS = "maxThreads"; + public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass"; + + public static final String PARM_INPUT_DIR = "inputDirURI"; + public static final String PARM_SUFFIX = "inputSuffix"; + + private Map<String,GroupOfChilds> m_omChildPrc + = new HashMap<String,GroupOfChilds>(); + + private ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup(); + + private Logger m_oLogger; + private DomElement m_oParms; + private ParamsRepository m_oParmRepos; + + private TopicConnection m_oTopicConn = null; + private TopicSession m_oSession = null; + private Topic m_oTopic = null; + private TopicSubscriber m_oTopicSub = null; + + public DirListener(String p_sParamsUid) throws Exception + { + m_oLogger = EsbUtil.getDefaultLogger(this.getClass()); +// setupSubscribe(); + /* + * Removing Nagios integration + try + { + String sNagServer = System.getProperty("jbossEsb.nagios.server"); + if (sNagServer != null) + { + int iNagPort = Integer.parseInt + (System.getProperty("jbossEsb.nagios.port","5667")); + String sNagService = + System.getProperty("jbossEsb.nagios.service"); + new NagiosStandaloneHeartbeat( + sNagServer, iNagPort, sNagService, "rosetta-listener-service"); + } + } + catch (Exception eNagios) + { + m_oLogger.info("Problems with Nagios Notification", eNagios); + } + */ + + String sFactoryClass = EsbSysProps.getParamsReposFactoryClass(); + m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null); + + Name oParms = ParamsReposUtil.nameFromString(p_sParamsUid); + while (loadParmsCycle(oParms)) + { + } + } //__________________________________ + + protected boolean loadParmsCycle(Name p_oParams) throws Exception + { + String sMsg = (null == m_oParms) + ? "Initial Parameter loading" : "Reloading Params"; + m_oLogger.info(sMsg); + + try + { + m_oParms = m_oParmRepos.getElement(p_oParams); + } + catch (Exception e) + { + m_oLogger.warn("Failed to load parameters"); + if (null == m_oParms) + { + throw e; + } + } + + String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY); + long lNewLoad = System.currentTimeMillis() + + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000); + DomElement[] oaParms = m_oParms.getAllElemChildren(); + + sAtt = m_oParms.getAttr(PARM_POLL_LTCY); + long lPollLtcy = (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 20000; + if (lPollLtcy < 3000) + { + lPollLtcy = 3000; + + } + boolean bFirst = true; + while (System.currentTimeMillis() <= lNewLoad) + { + for (DomElement oCurr : oaParms) + { + oneScan(oCurr, bFirst); + } + long lSlack = lNewLoad - System.currentTimeMillis(); + if (lSlack < 0) + { + break; + } + if (waitForQuiesce(Math.min(lSlack, lPollLtcy))) + { + return false; + } + bFirst = false; + } + return true; + } //_________________________________________ + + public void setupSubscribe() throws JMSException, NamingException + { + try + { + String sStopTopic = "topic/quiesce"; + StringBuilder sb = new StringBuilder("processor='DirListener'"); + + String sJndiServer = EsbSysProps.getDefaultJndiServer(); + Context oJndiCtx = AppServerContext.getServerContext(sJndiServer); + + Object tmp = oJndiCtx.lookup("UIL2ConnectionFactory"); + TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; + m_oTopicConn = tcf.createTopicConnection(); + m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic); + m_oSession = m_oTopicConn.createTopicSession + (false,TopicSession.AUTO_ACKNOWLEDGE); + m_oTopicConn.start(); + m_oTopicSub = m_oSession.createSubscriber(m_oTopic, sb.toString(),true); + } + catch (Exception e) + { m_oLogger.error("Problems connecting to JMS. ",e); + } + + + } //_________________________________________ + + private boolean waitForQuiesce(long p_lMillis) throws Exception + { + try + { boolean bRec = false; + + if (null != secureReceive(p_lMillis)) + { bRec = true; + m_oLogger.info("Starting Quiesce of Listener. "); + } + else + Thread.sleep(p_lMillis); + return bRec; + + } + catch (Exception e) + { m_oLogger.error("Problems with waitForQuiesce. ",e); + Thread.sleep(p_lMillis); + return false; + } + } //_________________________________________ + + private Object secureReceive(long p_lMillis) throws Exception + { + while (true) + try + { return (null==m_oTopicSub) ? null : m_oTopicSub.receive(p_lMillis); } + catch (JMSException e) + { + // put here your recovery code + return null; + } + + } //_________________________________________ + + + private void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception + { + String sPrcName = p_oP.getName(); + if (!m_omChildPrc.containsKey(sPrcName)) + { + ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()]; + int iMax = m_oThrGrp.enumerate(oaCh); + + ThreadGroup oThG = null; + for (int i1 = 0; null == oThG && i1 < iMax; i1++) + { if (m_oThrGrp.getName().equals(sPrcName)) + oThG = oaCh[i1]; + } + if (null == oThG) + oThG = new ThreadGroup(sPrcName); + m_omChildPrc.put(sPrcName, new GroupOfChilds(oThG)); + } + GroupOfChilds oCnt = m_omChildPrc.get(sPrcName); + + if (null == oCnt) return; + if (p_bFirst) + oCnt.m_bError = false; + + try + { + oCnt.execute(p_oP); + } + catch (Exception e) + { + oCnt.m_bError = true; + m_oLogger.error("GroupOfChilds.execute", e); + } + } //_________________________________________ + + private class GroupOfChilds implements Observer + { + private ThreadGroup m_oThrGrp; + private boolean m_bError = false; + + private File m_oInpDir; + private FileFilter m_oFFilt; + private Class m_oExecClass; + private DomElement m_oChParms; + private int m_iQthr = 0, m_iMaxThr; + private StringBuilder m_sb; + private int m_iSbIni; + + private GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception + { + m_oThrGrp = p_oThrGrp; + m_sb = new StringBuilder("GroupOfThreads ") + .append(m_oThrGrp.getName()).append(" : "); + m_iSbIni = m_sb.length(); + } //________________________________ + + public void update(Observable p_oObs, Object p_oUsrObj) + { + if (p_oUsrObj instanceof Integer) + { + updQthreads( ( (Integer) p_oUsrObj).intValue()); + } + } //________________________________ + + private synchronized void updQthreads(int p_i) + { + m_iQthr += p_i; + } //________________________________ + + private void execute(DomElement p_oP) throws Exception + { + m_sb.setLength(m_iSbIni); + if (m_bError) + { + m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors") + .toString()); + return; + } + checkParms(p_oP); + + File[] oaF = m_oInpDir.listFiles(m_oFFilt); + + for (File oFcurr : oaF) + { + if (m_iQthr >= m_iMaxThr) + { + m_oLogger.info(m_sb.append("Waiting for available threads").toString()); + Thread.sleep(5000); + break; + } + m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE); + DomElement oThisProc = new DomElement(EsbAbstractProcessor.PARMS_THIS_INSTANCE); + oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName()); + m_oChParms.addElemChild(oThisProc); + + new Thread(m_oThrGrp, + new OneChildProcess(m_oExecClass, this, m_oChParms)).start(); + Thread.sleep(500); + } + } //________________________________ + + private void checkParms(DomElement p_oP) throws Exception + { + String sAtt = p_oP.getAttr(PARM_MAX_THREADS); + m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt); + if (m_iMaxThr > 10) + { + m_iMaxThr = 10; + } + String sUid = p_oP.getName(); + m_oChParms = p_oP.cloneObj(); + sAtt = m_oChParms.getAttr(PARM_INPUT_DIR); + if (null == sAtt) + { + throw new Exception(m_sb.append("Missing ").append(PARM_INPUT_DIR) + .append(" attribute in -parameters ") + .append(sUid).toString()); + } + m_oInpDir = new File(new URI(sAtt)); + if (!m_oInpDir.isDirectory()) + { + throw new Exception(m_sb.append(sAtt).append(" is not a directory"). + toString()); + } + if (!m_oInpDir.canRead()) + { + throw new Exception(m_sb.append("Can't read directory ").append(sAtt). + toString()); + } + + sAtt = m_oChParms.getAttr(PARM_SUFFIX); + if (null == sAtt) + { + throw new Exception(m_sb.append("Missing ").append(PARM_SUFFIX) + .append(" attribute in -parameters ") + .append(sUid).toString()); + } + + m_oFFilt = new FileEndsWith(sAtt); + + do + { + sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS); + m_oExecClass = null; + if (null == sAtt) + { + throw new Exception(m_sb.append("Missing actionClass attribute"). + toString()); + } + try + { + m_oExecClass = Class.forName(sAtt); + } + catch (ClassNotFoundException e) + { + throw new Exception(m_sb.append("Class ").append(sAtt) + .append(" not found in classpath").toString()); + } + try + { + m_oExecClass.getConstructor(new Class[] {DomElement.class}); + } + catch (NoSuchMethodException eN) + { + throw new Exception(m_sb.append("No appropriate constructor") + .append(" (DomElement) found for class").toString()); + } + } + while (false); + } //________________________________ + + private class FileEndsWith implements FileFilter + { + String m_sSuffix; + FileEndsWith(String p_sEnd) throws Exception + { + m_sSuffix = p_sEnd; + if (null == m_sSuffix) + { + throw new Exception("Must specify file extension"); + } + } //_________________________________________ + + public boolean accept(File p_f) + { + if (!p_f.isFile()) + { + return false; + } + return p_f.toString().endsWith(m_sSuffix); + } //_________________________________________ + } //___________________________________________________ + } //______________________________________________________ +} //____________________________________________________________________________ |