From: <jbo...@li...> - 2006-06-29 15:11:46
|
Author: estebanschifman Date: 2006-06-29 11:11:40 -0400 (Thu, 29 Jun 2006) New Revision: 4869 Removed: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java Log: Recovered lost version of this listener Deleted: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-06-29 11:37:43 UTC (rev 4868) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-06-29 15:11:40 UTC (rev 4869) @@ -1,410 +0,0 @@ -/* -* JBoss, Home of Professional Open Source -* Copyright 2006, JBoss Inc., and individual contributors as indicated -* by the @authors tag. See the copyright.txt in the distribution for a -* full listing of individual contributors. -* -* This is free software; you can redistribute it and/or modify it -* under the terms of the GNU Lesser General Public License as -* published by the Free Software Foundation; either version 2.1 of -* the License, or (at your option) any later version. -* -* This software is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -* Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public -* License along with this software; if not, write to the Free -* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA -* 02110-1301 USA, or see the FSF site: http://www.fsf.org. -*/ - - -package org.jboss.soa.esb.listeners; - -import java.io.*; -import java.net.*; -import java.util.*; -import org.apache.log4j.*; -import javax.naming.*; -import javax.jms.*; - -import org.jboss.soa.esb.util.*; -import org.jboss.soa.esb.common.*; -import org.jboss.soa.esb.helpers.*; -import org.jboss.soa.esb.parameters.*; -import org.jboss.soa.esb.processors.*; - -//import org.jboss.soa.esb.nagios.*; - -public class DirListener -{ - public static void main(String[] args) throws Exception - { - new DirListener(args[0]); - } //________________________________ - - public static final String PARM_POLL_LTCY = "pollLatency"; - public static final String PARM_RELOAD_LTCY = "reloadLatency"; - - public static final String PARM_MAX_THREADS = "maxThreads"; - public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass"; - - public static final String PARM_INPUT_DIR = "inputDirURI"; - public static final String PARM_SUFFIX = "inputSuffix"; - - private Map<String,GroupOfChilds> m_omChildPrc - = new HashMap<String,GroupOfChilds>(); - - private ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup(); - - private Logger m_oLogger; - private DomElement m_oParms; - private ParamsRepository m_oParmRepos; - - private TopicConnection m_oTopicConn = null; - private TopicSession m_oSession = null; - private Topic m_oTopic = null; - private TopicSubscriber m_oTopicSub = null; - - public DirListener(String p_sParamsUid) throws Exception - { - m_oLogger = EsbUtil.getDefaultLogger(this.getClass()); -// setupSubscribe(); - /* - * Removing Nagios integration - try - { - String sNagServer = System.getProperty("jbossEsb.nagios.server"); - if (sNagServer != null) - { - int iNagPort = Integer.parseInt - (System.getProperty("jbossEsb.nagios.port","5667")); - String sNagService = - System.getProperty("jbossEsb.nagios.service"); - new NagiosStandaloneHeartbeat( - sNagServer, iNagPort, sNagService, "rosetta-listener-service"); - } - } - catch (Exception eNagios) - { - m_oLogger.info("Problems with Nagios Notification", eNagios); - } - */ - - String sFactoryClass = EsbSysProps.getParamsReposFactoryClass(); - m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null); - - Name oParms = ParamsReposUtil.nameFromString(p_sParamsUid); - while (loadParmsCycle(oParms)) - { - } - } //__________________________________ - - protected boolean loadParmsCycle(Name p_oParams) throws Exception - { - String sMsg = (null == m_oParms) - ? "Initial Parameter loading" : "Reloading Params"; - m_oLogger.info(sMsg); - - try - { - m_oParms = m_oParmRepos.getElement(p_oParams); - } - catch (Exception e) - { - m_oLogger.warn("Failed to load parameters"); - if (null == m_oParms) - { - throw e; - } - } - - String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY); - long lNewLoad = System.currentTimeMillis() - + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000); - DomElement[] oaParms = m_oParms.getAllElemChildren(); - - sAtt = m_oParms.getAttr(PARM_POLL_LTCY); - long lPollLtcy = (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 20000; - if (lPollLtcy < 3000) - { - lPollLtcy = 3000; - - } - boolean bFirst = true; - while (System.currentTimeMillis() <= lNewLoad) - { - for (DomElement oCurr : oaParms) - { - oneScan(oCurr, bFirst); - } - long lSlack = lNewLoad - System.currentTimeMillis(); - if (lSlack < 0) - { - break; - } - if (waitForQuiesce(Math.min(lSlack, lPollLtcy))) - { - return false; - } - bFirst = false; - } - return true; - } //_________________________________________ - - public void setupSubscribe() throws JMSException, NamingException - { - try - { - String sStopTopic = "topic/quiesce"; - StringBuilder sb = new StringBuilder("processor='DirListener'"); - - String sJndiServer = EsbSysProps.getDefaultJndiServer(); - Context oJndiCtx = AppServerContext.getServerContext(sJndiServer); - - Object tmp = oJndiCtx.lookup("UIL2ConnectionFactory"); - TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; - m_oTopicConn = tcf.createTopicConnection(); - m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic); - m_oSession = m_oTopicConn.createTopicSession - (false,TopicSession.AUTO_ACKNOWLEDGE); - m_oTopicConn.start(); - m_oTopicSub = m_oSession.createSubscriber(m_oTopic, sb.toString(),true); - } - catch (Exception e) - { m_oLogger.error("Problems connecting to JMS. ",e); - } - - - } //_________________________________________ - - private boolean waitForQuiesce(long p_lMillis) throws Exception - { - try - { boolean bRec = false; - - if (null != secureReceive(p_lMillis)) - { bRec = true; - m_oLogger.info("Starting Quiesce of Listener. "); - } - else - Thread.sleep(p_lMillis); - return bRec; - - } - catch (Exception e) - { m_oLogger.error("Problems with waitForQuiesce. ",e); - Thread.sleep(p_lMillis); - return false; - } - } //_________________________________________ - - private Object secureReceive(long p_lMillis) throws Exception - { - while (true) - try - { return (null==m_oTopicSub) ? null : m_oTopicSub.receive(p_lMillis); } - catch (JMSException e) - { - // put here your recovery code - return null; - } - - } //_________________________________________ - - - private void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception - { - String sPrcName = p_oP.getName(); - if (!m_omChildPrc.containsKey(sPrcName)) - { - ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()]; - int iMax = m_oThrGrp.enumerate(oaCh); - - ThreadGroup oThG = null; - for (int i1 = 0; null == oThG && i1 < iMax; i1++) - { if (m_oThrGrp.getName().equals(sPrcName)) - oThG = oaCh[i1]; - } - if (null == oThG) - oThG = new ThreadGroup(sPrcName); - m_omChildPrc.put(sPrcName, new GroupOfChilds(oThG)); - } - GroupOfChilds oCnt = m_omChildPrc.get(sPrcName); - - if (null == oCnt) return; - if (p_bFirst) - oCnt.m_bError = false; - - try - { - oCnt.execute(p_oP); - } - catch (Exception e) - { - oCnt.m_bError = true; - m_oLogger.error("GroupOfChilds.execute", e); - } - } //_________________________________________ - - private class GroupOfChilds implements Observer - { - private ThreadGroup m_oThrGrp; - private boolean m_bError = false; - - private File m_oInpDir; - private FileFilter m_oFFilt; - private Class m_oExecClass; - private DomElement m_oChParms; - private int m_iQthr = 0, m_iMaxThr; - private StringBuilder m_sb; - private int m_iSbIni; - - private GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception - { - m_oThrGrp = p_oThrGrp; - m_sb = new StringBuilder("GroupOfThreads ") - .append(m_oThrGrp.getName()).append(" : "); - m_iSbIni = m_sb.length(); - } //________________________________ - - public void update(Observable p_oObs, Object p_oUsrObj) - { - if (p_oUsrObj instanceof Integer) - { - updQthreads( ( (Integer) p_oUsrObj).intValue()); - } - } //________________________________ - - private synchronized void updQthreads(int p_i) - { - m_iQthr += p_i; - } //________________________________ - - private void execute(DomElement p_oP) throws Exception - { - m_sb.setLength(m_iSbIni); - if (m_bError) - { - m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors") - .toString()); - return; - } - checkParms(p_oP); - - File[] oaF = m_oInpDir.listFiles(m_oFFilt); - - for (File oFcurr : oaF) - { - if (m_iQthr >= m_iMaxThr) - { - m_oLogger.info(m_sb.append("Waiting for available threads").toString()); - Thread.sleep(5000); - break; - } - m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE); - DomElement oThisProc = new DomElement(EsbAbstractProcessor.PARMS_THIS_INSTANCE); - oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName()); - m_oChParms.addElemChild(oThisProc); - - new Thread(m_oThrGrp, - new OneChildProcess(m_oExecClass, this, m_oChParms)).start(); - Thread.sleep(500); - } - } //________________________________ - - private void checkParms(DomElement p_oP) throws Exception - { - String sAtt = p_oP.getAttr(PARM_MAX_THREADS); - m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt); - if (m_iMaxThr > 10) - { - m_iMaxThr = 10; - } - String sUid = p_oP.getName(); - m_oChParms = p_oP.cloneObj(); - sAtt = m_oChParms.getAttr(PARM_INPUT_DIR); - if (null == sAtt) - { - throw new Exception(m_sb.append("Missing ").append(PARM_INPUT_DIR) - .append(" attribute in -parameters ") - .append(sUid).toString()); - } - m_oInpDir = new File(new URI(sAtt)); - if (!m_oInpDir.isDirectory()) - { - throw new Exception(m_sb.append(sAtt).append(" is not a directory"). - toString()); - } - if (!m_oInpDir.canRead()) - { - throw new Exception(m_sb.append("Can't read directory ").append(sAtt). - toString()); - } - - sAtt = m_oChParms.getAttr(PARM_SUFFIX); - if (null == sAtt) - { - throw new Exception(m_sb.append("Missing ").append(PARM_SUFFIX) - .append(" attribute in -parameters ") - .append(sUid).toString()); - } - - m_oFFilt = new FileEndsWith(sAtt); - - do - { - sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS); - m_oExecClass = null; - if (null == sAtt) - { - throw new Exception(m_sb.append("Missing actionClass attribute"). - toString()); - } - try - { - m_oExecClass = Class.forName(sAtt); - } - catch (ClassNotFoundException e) - { - throw new Exception(m_sb.append("Class ").append(sAtt) - .append(" not found in classpath").toString()); - } - try - { - m_oExecClass.getConstructor(new Class[] {DomElement.class}); - } - catch (NoSuchMethodException eN) - { - throw new Exception(m_sb.append("No appropriate constructor") - .append(" (DomElement) found for class").toString()); - } - } - while (false); - } //________________________________ - - private class FileEndsWith implements FileFilter - { - String m_sSuffix; - FileEndsWith(String p_sEnd) throws Exception - { - m_sSuffix = p_sEnd; - if (null == m_sSuffix) - { - throw new Exception("Must specify file extension"); - } - } //_________________________________________ - - public boolean accept(File p_f) - { - if (!p_f.isFile()) - { - return false; - } - return p_f.toString().endsWith(m_sSuffix); - } //_________________________________________ - } //___________________________________________________ - } //______________________________________________________ -} //____________________________________________________________________________ |