From: <jbo...@li...> - 2006-07-03 20:15:19
|
Author: estebanschifman Date: 2006-07-03 16:15:11 -0400 (Mon, 03 Jul 2006) New Revision: 4903 Added: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OldDirListener.java Removed: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OneChildProcess.java Modified: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java Log: Renamed old 'DirListener' to 'OldDirListener' Got rid of the stand alone OneChildProcess class. It's now an inner class Moved some methods/variables from BetterListener to AbstractPoller Modified: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-07-03 16:58:38 UTC (rev 4902) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-07-03 20:15:11 UTC (rev 4903) @@ -23,6 +23,7 @@ package org.jboss.soa.esb.listeners; +import java.lang.reflect.Constructor; import java.util.*; import org.apache.log4j.*; @@ -34,13 +35,23 @@ import org.jboss.soa.esb.common.*; import org.jboss.soa.esb.helpers.*; import org.jboss.soa.esb.parameters.*; +import org.jboss.soa.esb.processors.EsbFileProcessor; public abstract class AbstractPoller { - protected abstract void executeOneCycle()throws Exception; protected abstract GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception; + // You can override these three values at constructor time of your + // derived class after calling super(String) + protected int m_iMinPollMillis = 3000 // minimum polling interval + ,m_iDfltPollMillis = 20000 // default polling interval + ,m_iDfltReloadMillis= 180000 // default interval between + // parameter reloading + ; + + public static final String PARM_ACTION_CLASS = "actionClass"; + public static final String PARM_POLL_LTCY = "pollLatency"; public static final String PARM_RELOAD_LTCY = "reloadLatency"; @@ -63,6 +74,7 @@ protected TopicSession m_oSession; protected Topic m_oTopic; protected TopicSubscriber m_oTopicSubs; + protected AbstractPoller(String p_sParamsUid) throws Exception @@ -97,6 +109,45 @@ } } //__________________________________ + private void executeOneCycle() throws Exception + { + String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY); + long lNewLoad = System.currentTimeMillis() + + ( (null != sAtt) + ? (1000 * Integer.parseInt(sAtt)) + : m_iDfltReloadMillis + ); + + DomElement[] oaParms = m_oParms.getAllElemChildren(); + + sAtt = m_oParms.getAttr(PARM_POLL_LTCY); + long lPollLtcy = (null != sAtt) + ? (1000 * Integer.parseInt(sAtt)) + : m_iDfltPollMillis; + + if (lPollLtcy < m_iMinPollMillis) + lPollLtcy = m_iMinPollMillis; + + boolean bFirst = true; + while (System.currentTimeMillis() <= lNewLoad) + { + for (DomElement oCurr : oaParms) + { + oneScan(oCurr, bFirst); + } + long lSlack = lNewLoad - System.currentTimeMillis(); + if (lSlack < 0) + { + break; + } + if (waitForQuiesce(Math.min(lSlack, lPollLtcy))) + { m_bEndRequested = true; + return; + } + bFirst = false; + } + } //_________________________________________ + protected String formatLogMsg(String p_s) { return new StringBuilder("Processor '") .append(EsbUtil.classSuffix(this.getClass())).append("' <") @@ -220,14 +271,14 @@ public static final String PARM_MAX_THREADS = "maxThreads"; - protected ThreadGroup m_oThrGrp; - protected boolean m_bError = false; + protected ThreadGroup m_oThrGrp; + protected boolean m_bError = false; protected Class m_oExecClass; protected DomElement m_oChParms; protected int m_iQthr = 0, m_iMaxThr; - protected StringBuilder m_sb; + protected StringBuilder m_sb; protected int m_iSbIni; protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception @@ -280,10 +331,37 @@ // and to add REAL parameter checking protected void checkParms(DomElement p_oP) throws Exception { - m_sb.setLength(0); + m_sb.setLength(m_iSbIni); m_oChParms = p_oP.cloneObj(); setMaxThreads(p_oP,10); } //________________________________ - } //______________________________________________________ + } //______________________________________________________ + + protected static class ChildProcess extends Observable implements Runnable + { private Class m_oExecClass; + private DomElement m_oParms; + private Logger m_oLogger; + public ChildProcess(Class p_oExec, Observer p_oObs, DomElement p_oP) + { m_oLogger = EsbUtil.getDefaultLogger(this.getClass()); + m_oExecClass = p_oExec; + this.addObserver(p_oObs); + m_oParms = p_oP; + setChanged(); + notifyObservers(new Integer(1)); + } //__________________________________ + + public void run() + { try + { Constructor oCnst = m_oExecClass.getConstructor(new Class[] {DomElement.class}); + Object oProc = oCnst.newInstance(new Object[] {m_oParms}); + ((EsbFileProcessor)oProc).execute(); + } + catch (Exception e) { m_oLogger.error("run() FAILED",e); } + + setChanged(); + notifyObservers(new Integer(-1)); + } //__________________________________ + } //______________________________________________________ + } //____________________________________________________________________________ Modified: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java 2006-07-03 16:58:38 UTC (rev 4902) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java 2006-07-03 20:15:11 UTC (rev 4903) @@ -36,54 +36,16 @@ new BetterDirListener(args[0]); } //________________________________ - public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass"; - public static final String PARM_INPUT_DIR = "inputDirURI"; - public static final String PARM_SUFFIX = "inputSuffix"; - public BetterDirListener(String p_sParamsUid) throws Exception { super(p_sParamsUid); + m_iDfltReloadMillis = 180000; + m_iDfltPollMillis = 20000; + m_iMinPollMillis = 5000; // See superclass - It provides ability to request end by subscribing to a Topic runUntilEndRequested(); } //__________________________________ - @Override - protected void executeOneCycle() throws Exception - { - String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY); - long lNewLoad = System.currentTimeMillis() - + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000); - DomElement[] oaParms = m_oParms.getAllElemChildren(); - - sAtt = m_oParms.getAttr(PARM_POLL_LTCY); - long lPollLtcy = (null != sAtt) - ? (1000 * Integer.parseInt(sAtt)) - : 20000; // if poll latency was not there, do it every 20 secs - - if (lPollLtcy < 3000) - lPollLtcy = 3000; // but not too often - - boolean bFirst = true; - while (System.currentTimeMillis() <= lNewLoad) - { - for (DomElement oCurr : oaParms) - { - super.oneScan(oCurr, bFirst); - } - long lSlack = lNewLoad - System.currentTimeMillis(); - if (lSlack < 0) - { - break; - } - if (waitForQuiesce(Math.min(lSlack, lPollLtcy))) - { m_bEndRequested = true; - return; - } - bFirst = false; - } - } //_________________________________________ - - @Override protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception { return new MyChildGroup(pThG); @@ -91,6 +53,10 @@ private class MyChildGroup extends AbstractPoller.GroupOfChilds { + public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass"; + public static final String PARM_INPUT_DIR = "inputDirURI"; + public static final String PARM_SUFFIX = "inputSuffix"; + private File m_oInpDir; private FileFilter m_oFFilt; @@ -100,14 +66,7 @@ @Override protected void doYourJob(DomElement p_oP) throws Exception - { m_sb.setLength(m_iSbIni); - if (m_bError) - { - m_oLogger.warn(formatLogMsg(" Skipping execution due to previous errors")); - return; - } - checkParms(p_oP); - + { File[] oaF = m_oInpDir.listFiles(m_oFFilt); for (File oFcurr : oaF) @@ -124,17 +83,16 @@ m_oChParms.addElemChild(oThisProc); new Thread(m_oThrGrp, - new OneChildProcess(m_oExecClass, this, m_oChParms)).start(); + new ChildProcess(m_oExecClass, this, m_oChParms)).start(); Thread.sleep(500); } - } + } //________________________________ protected void checkParms(DomElement p_oP) throws Exception { super.checkParms(p_oP); - String sAtt = p_oP.getAttr(PARM_MAX_THREADS); - sAtt = m_oChParms.getAttr(PARM_INPUT_DIR); + String sAtt = m_oChParms.getAttr(PARM_INPUT_DIR); if (null == sAtt) { throw new Exception(formatLogMsg( m_sb.append("Missing ").append(PARM_INPUT_DIR) @@ -168,7 +126,7 @@ m_oExecClass = null; if (null == sAtt) { throw new Exception(formatLogMsg( - m_sb.append("Missing actionClass attribute"). + m_sb.append("Missing fileProcessorClass attribute"). toString())); } try Deleted: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-07-03 16:58:38 UTC (rev 4902) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-07-03 20:15:11 UTC (rev 4903) @@ -1,411 +0,0 @@ -/* -* JBoss, Home of Professional Open Source -* Copyright 2006, JBoss Inc., and individual contributors as indicated -* by the @authors tag. See the copyright.txt in the distribution for a -* full listing of individual contributors. -* -* This is free software; you can redistribute it and/or modify it -* under the terms of the GNU Lesser General Public License as -* published by the Free Software Foundation; either version 2.1 of -* the License, or (at your option) any later version. -* -* This software is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -* Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public -* License along with this software; if not, write to the Free -* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA -* 02110-1301 USA, or see the FSF site: http://www.fsf.org. -*/ - - -package org.jboss.soa.esb.listeners; - -import java.io.*; -import java.net.*; -import java.util.*; -import org.apache.log4j.*; -import javax.naming.*; -import javax.jms.*; - -import org.jboss.soa.esb.util.*; -import org.jboss.soa.esb.common.*; -import org.jboss.soa.esb.helpers.*; -import org.jboss.soa.esb.parameters.*; -import org.jboss.soa.esb.processors.*; - -//import org.jboss.soa.esb.nagios.*; - -public class DirListener -{ - public static void main(String[] args) throws Exception - { - new DirListener(args[0]); - } //________________________________ - - public static final String PARM_POLL_LTCY = "pollLatency"; - public static final String PARM_RELOAD_LTCY = "reloadLatency"; - - public static final String PARM_MAX_THREADS = "maxThreads"; - public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass"; - - public static final String PARM_INPUT_DIR = "inputDirURI"; - public static final String PARM_SUFFIX = "inputSuffix"; - - private Map<String,GroupOfChilds> m_omChildPrc - = new HashMap<String,GroupOfChilds>(); - - private ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup(); - - private Logger m_oLogger; - private DomElement m_oParms; - private ParamsRepository m_oParmRepos; - - private TopicConnection m_oTopicConn = null; - private TopicSession m_oSession = null; - private Topic m_oTopic = null; - private TopicSubscriber m_oTopicSub = null; - - public DirListener(String p_sParamsUid) throws Exception - { - m_oLogger = EsbUtil.getDefaultLogger(this.getClass()); -// setupSubscribe(); - /* - * Removing Nagios integration - try - { - String sNagServer = System.getProperty("jbossEsb.nagios.server"); - if (sNagServer != null) - { - int iNagPort = Integer.parseInt - (System.getProperty("jbossEsb.nagios.port","5667")); - String sNagService = - System.getProperty("jbossEsb.nagios.service"); - new NagiosStandaloneHeartbeat( - sNagServer, iNagPort, sNagService, "rosetta-listener-service"); - } - } - catch (Exception eNagios) - { - m_oLogger.info("Problems with Nagios Notification", eNagios); - } - */ - - String sFactoryClass = EsbSysProps.getParamsReposFactoryClass(); - m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null); - - Name oParms = ParamsReposUtil.nameFromString(p_sParamsUid); - while (loadParmsCycle(oParms)) - { - } - } //__________________________________ - - protected boolean loadParmsCycle(Name p_oParams) throws Exception - { - String sMsg = (null == m_oParms) - ? "Initial Parameter loading" : "Reloading Params"; - m_oLogger.info(sMsg); - - try - { - m_oParms = m_oParmRepos.getElement(p_oParams); - } - catch (Exception e) - { - m_oLogger.warn("Failed to load parameters"); - if (null == m_oParms) - { - throw e; - } - } - - String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY); - long lNewLoad = System.currentTimeMillis() - + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000); - DomElement[] oaParms = m_oParms.getAllElemChildren(); - - sAtt = m_oParms.getAttr(PARM_POLL_LTCY); - long lPollLtcy = (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 20000; - if (lPollLtcy < 3000) - { - lPollLtcy = 3000; - - } - boolean bFirst = true; - while (System.currentTimeMillis() <= lNewLoad) - { - for (DomElement oCurr : oaParms) - { - oneScan(oCurr, bFirst); - } - long lSlack = lNewLoad - System.currentTimeMillis(); - if (lSlack < 0) - { - break; - } - if (waitForQuiesce(Math.min(lSlack, lPollLtcy))) - { - return false; - } - bFirst = false; - } - return true; - } //_________________________________________ - - public void setupSubscribe() throws JMSException, NamingException - { - try - { - String sStopTopic = "topic/quiesce"; - StringBuilder sb = new StringBuilder("processor='DirListener'"); - - String sJndiType = EsbSysProps.getJndiServerType(); - String sJndiServer = EsbSysProps.getJndiServerURL(); - Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiServer); - - Object tmp = oJndiCtx.lookup("UIL2ConnectionFactory"); - TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; - m_oTopicConn = tcf.createTopicConnection(); - m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic); - m_oSession = m_oTopicConn.createTopicSession - (false,TopicSession.AUTO_ACKNOWLEDGE); - m_oTopicConn.start(); - m_oTopicSub = m_oSession.createSubscriber(m_oTopic, sb.toString(),true); - } - catch (Exception e) - { m_oLogger.error("Problems connecting to JMS. ",e); - } - - - } //_________________________________________ - - private boolean waitForQuiesce(long p_lMillis) throws Exception - { - try - { boolean bRec = false; - - if (null != secureReceive(p_lMillis)) - { bRec = true; - m_oLogger.info("Starting Quiesce of Listener. "); - } - else - Thread.sleep(p_lMillis); - return bRec; - - } - catch (Exception e) - { m_oLogger.error("Problems with waitForQuiesce. ",e); - Thread.sleep(p_lMillis); - return false; - } - } //_________________________________________ - - private Object secureReceive(long p_lMillis) throws Exception - { - while (true) - try - { return (null==m_oTopicSub) ? null : m_oTopicSub.receive(p_lMillis); } - catch (JMSException e) - { - // put here your recovery code - return null; - } - - } //_________________________________________ - - - private void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception - { - String sPrcName = p_oP.getName(); - if (!m_omChildPrc.containsKey(sPrcName)) - { - ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()]; - int iMax = m_oThrGrp.enumerate(oaCh); - - ThreadGroup oThG = null; - for (int i1 = 0; null == oThG && i1 < iMax; i1++) - { if (m_oThrGrp.getName().equals(sPrcName)) - oThG = oaCh[i1]; - } - if (null == oThG) - oThG = new ThreadGroup(sPrcName); - m_omChildPrc.put(sPrcName, new GroupOfChilds(oThG)); - } - GroupOfChilds oCnt = m_omChildPrc.get(sPrcName); - - if (null == oCnt) return; - if (p_bFirst) - oCnt.m_bError = false; - - try - { - oCnt.execute(p_oP); - } - catch (Exception e) - { - oCnt.m_bError = true; - m_oLogger.error("GroupOfChilds.execute", e); - } - } //_________________________________________ - - private class GroupOfChilds implements Observer - { - private ThreadGroup m_oThrGrp; - private boolean m_bError = false; - - private File m_oInpDir; - private FileFilter m_oFFilt; - private Class m_oExecClass; - private DomElement m_oChParms; - private int m_iQthr = 0, m_iMaxThr; - private StringBuilder m_sb; - private int m_iSbIni; - - private GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception - { - m_oThrGrp = p_oThrGrp; - m_sb = new StringBuilder("GroupOfThreads ") - .append(m_oThrGrp.getName()).append(" : "); - m_iSbIni = m_sb.length(); - } //________________________________ - - public void update(Observable p_oObs, Object p_oUsrObj) - { - if (p_oUsrObj instanceof Integer) - { - updQthreads( ( (Integer) p_oUsrObj).intValue()); - } - } //________________________________ - - private synchronized void updQthreads(int p_i) - { - m_iQthr += p_i; - } //________________________________ - - private void execute(DomElement p_oP) throws Exception - { - m_sb.setLength(m_iSbIni); - if (m_bError) - { - m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors") - .toString()); - return; - } - checkParms(p_oP); - - File[] oaF = m_oInpDir.listFiles(m_oFFilt); - - for (File oFcurr : oaF) - { - if (m_iQthr >= m_iMaxThr) - { - m_oLogger.info(m_sb.append("Waiting for available threads").toString()); - Thread.sleep(5000); - break; - } - m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE); - DomElement oThisProc = new DomElement(EsbAbstractProcessor.PARMS_THIS_INSTANCE); - oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName()); - m_oChParms.addElemChild(oThisProc); - - new Thread(m_oThrGrp, - new OneChildProcess(m_oExecClass, this, m_oChParms)).start(); - Thread.sleep(500); - } - } //________________________________ - - protected void checkParms(DomElement p_oP) throws Exception - { - String sAtt = p_oP.getAttr(PARM_MAX_THREADS); - m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt); - if (m_iMaxThr > 10) - { - m_iMaxThr = 10; - } - String sUid = p_oP.getName(); - m_oChParms = p_oP.cloneObj(); - sAtt = m_oChParms.getAttr(PARM_INPUT_DIR); - if (null == sAtt) - { - throw new Exception(m_sb.append("Missing ").append(PARM_INPUT_DIR) - .append(" attribute in -parameters ") - .append(sUid).toString()); - } - m_oInpDir = new File(new URI(sAtt)); - if (!m_oInpDir.isDirectory()) - { - throw new Exception(m_sb.append(sAtt).append(" is not a directory"). - toString()); - } - if (!m_oInpDir.canRead()) - { - throw new Exception(m_sb.append("Can't read directory ").append(sAtt). - toString()); - } - - sAtt = m_oChParms.getAttr(PARM_SUFFIX); - if (null == sAtt) - { - throw new Exception(m_sb.append("Missing ").append(PARM_SUFFIX) - .append(" attribute in -parameters ") - .append(sUid).toString()); - } - - m_oFFilt = new FileEndsWith(sAtt); - - do - { - sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS); - m_oExecClass = null; - if (null == sAtt) - { - throw new Exception(m_sb.append("Missing actionClass attribute"). - toString()); - } - try - { - m_oExecClass = Class.forName(sAtt); - } - catch (ClassNotFoundException e) - { - throw new Exception(m_sb.append("Class ").append(sAtt) - .append(" not found in classpath").toString()); - } - try - { - m_oExecClass.getConstructor(new Class[] {DomElement.class}); - } - catch (NoSuchMethodException eN) - { - throw new Exception(m_sb.append("No appropriate constructor") - .append(" (DomElement) found for class").toString()); - } - } - while (false); - } //________________________________ - - private class FileEndsWith implements FileFilter - { - String m_sSuffix; - FileEndsWith(String p_sEnd) throws Exception - { - m_sSuffix = p_sEnd; - if (null == m_sSuffix) - { - throw new Exception("Must specify file extension"); - } - } //_________________________________________ - - public boolean accept(File p_f) - { - if (!p_f.isFile()) - { - return false; - } - return p_f.toString().endsWith(m_sSuffix); - } //_________________________________________ - } //___________________________________________________ - } //______________________________________________________ -} //____________________________________________________________________________ Copied: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OldDirListener.java (from rev 4902, labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java) =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-07-03 16:58:38 UTC (rev 4902) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OldDirListener.java 2006-07-03 20:15:11 UTC (rev 4903) @@ -0,0 +1,440 @@ +/* +* JBoss, Home of Professional Open Source +* Copyright 2006, JBoss Inc., and individual contributors as indicated +* by the @authors tag. See the copyright.txt in the distribution for a +* full listing of individual contributors. +* +* This is free software; you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as +* published by the Free Software Foundation; either version 2.1 of +* the License, or (at your option) any later version. +* +* This software is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this software; if not, write to the Free +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +* 02110-1301 USA, or see the FSF site: http://www.fsf.org. +*/ + + +package org.jboss.soa.esb.listeners; + +import java.io.*; +import java.lang.reflect.Constructor; +import java.net.*; +import java.util.*; + +import org.apache.log4j.*; + +import javax.naming.*; +import javax.jms.*; + +import org.jboss.soa.esb.util.*; +import org.jboss.soa.esb.common.*; +import org.jboss.soa.esb.helpers.*; +import org.jboss.soa.esb.parameters.*; +import org.jboss.soa.esb.processors.*; + +//import org.jboss.soa.esb.nagios.*; + +public class OldDirListener +{ + public static void main(String[] args) throws Exception + { + new OldDirListener(args[0]); + } //________________________________ + + public static final String PARM_POLL_LTCY = "pollLatency"; + public static final String PARM_RELOAD_LTCY = "reloadLatency"; + + public static final String PARM_MAX_THREADS = "maxThreads"; + public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass"; + + public static final String PARM_INPUT_DIR = "inputDirURI"; + public static final String PARM_SUFFIX = "inputSuffix"; + + private Map<String,GroupOfChilds> m_omChildPrc + = new HashMap<String,GroupOfChilds>(); + + private ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup(); + + private Logger m_oLogger; + private DomElement m_oParms; + private ParamsRepository m_oParmRepos; + + private TopicConnection m_oTopicConn = null; + private TopicSession m_oSession = null; + private Topic m_oTopic = null; + private TopicSubscriber m_oTopicSub = null; + + public OldDirListener(String p_sParamsUid) throws Exception + { + m_oLogger = EsbUtil.getDefaultLogger(this.getClass()); +// setupSubscribe(); + /* + * Removing Nagios integration + try + { + String sNagServer = System.getProperty("jbossEsb.nagios.server"); + if (sNagServer != null) + { + int iNagPort = Integer.parseInt + (System.getProperty("jbossEsb.nagios.port","5667")); + String sNagService = + System.getProperty("jbossEsb.nagios.service"); + new NagiosStandaloneHeartbeat( + sNagServer, iNagPort, sNagService, "rosetta-listener-service"); + } + } + catch (Exception eNagios) + { + m_oLogger.info("Problems with Nagios Notification", eNagios); + } + */ + + String sFactoryClass = EsbSysProps.getParamsReposFactoryClass(); + m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null); + + Name oParms = ParamsReposUtil.nameFromString(p_sParamsUid); + while (loadParmsCycle(oParms)) + { + } + } //__________________________________ + + protected boolean loadParmsCycle(Name p_oParams) throws Exception + { + String sMsg = (null == m_oParms) + ? "Initial Parameter loading" : "Reloading Params"; + m_oLogger.info(sMsg); + + try + { + m_oParms = m_oParmRepos.getElement(p_oParams); + } + catch (Exception e) + { + m_oLogger.warn("Failed to load parameters"); + if (null == m_oParms) + { + throw e; + } + } + + String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY); + long lNewLoad = System.currentTimeMillis() + + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000); + DomElement[] oaParms = m_oParms.getAllElemChildren(); + + sAtt = m_oParms.getAttr(PARM_POLL_LTCY); + long lPollLtcy = (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 20000; + if (lPollLtcy < 3000) + { + lPollLtcy = 3000; + + } + boolean bFirst = true; + while (System.currentTimeMillis() <= lNewLoad) + { + for (DomElement oCurr : oaParms) + { + oneScan(oCurr, bFirst); + } + long lSlack = lNewLoad - System.currentTimeMillis(); + if (lSlack < 0) + { + break; + } + if (waitForQuiesce(Math.min(lSlack, lPollLtcy))) + { + return false; + } + bFirst = false; + } + return true; + } //_________________________________________ + + public void setupSubscribe() throws JMSException, NamingException + { + try + { + String sStopTopic = "topic/quiesce"; + StringBuilder sb = new StringBuilder("processor='OldDirListener'"); + + String sJndiType = EsbSysProps.getJndiServerType(); + String sJndiServer = EsbSysProps.getJndiServerURL(); + Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiServer); + + Object tmp = oJndiCtx.lookup("UIL2ConnectionFactory"); + TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; + m_oTopicConn = tcf.createTopicConnection(); + m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic); + m_oSession = m_oTopicConn.createTopicSession + (false,TopicSession.AUTO_ACKNOWLEDGE); + m_oTopicConn.start(); + m_oTopicSub = m_oSession.createSubscriber(m_oTopic, sb.toString(),true); + } + catch (Exception e) + { m_oLogger.error("Problems connecting to JMS. ",e); + } + + + } //_________________________________________ + + private boolean waitForQuiesce(long p_lMillis) throws Exception + { + try + { boolean bRec = false; + + if (null != secureReceive(p_lMillis)) + { bRec = true; + m_oLogger.info("Starting Quiesce of Listener. "); + } + else + Thread.sleep(p_lMillis); + return bRec; + + } + catch (Exception e) + { m_oLogger.error("Problems with waitForQuiesce. ",e); + Thread.sleep(p_lMillis); + return false; + } + } //_________________________________________ + + private Object secureReceive(long p_lMillis) throws Exception + { + while (true) + try + { return (null==m_oTopicSub) ? null : m_oTopicSub.receive(p_lMillis); } + catch (JMSException e) + { + // put here your recovery code + return null; + } + + } //_________________________________________ + + + private void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception + { + String sPrcName = p_oP.getName(); + if (!m_omChildPrc.containsKey(sPrcName)) + { + ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()]; + int iMax = m_oThrGrp.enumerate(oaCh); + + ThreadGroup oThG = null; + for (int i1 = 0; null == oThG && i1 < iMax; i1++) + { if (m_oThrGrp.getName().equals(sPrcName)) + oThG = oaCh[i1]; + } + if (null == oThG) + oThG = new ThreadGroup(sPrcName); + m_omChildPrc.put(sPrcName, new GroupOfChilds(oThG)); + } + GroupOfChilds oCnt = m_omChildPrc.get(sPrcName); + + if (null == oCnt) return; + if (p_bFirst) + oCnt.m_bError = false; + + try + { + oCnt.execute(p_oP); + } + catch (Exception e) + { + oCnt.m_bError = true; + m_oLogger.error("GroupOfChilds.execute", e); + } + } //_________________________________________ + + private class GroupOfChilds implements Observer + { + private ThreadGroup m_oThrGrp; + private boolean m_bError = false; + + private File m_oInpDir; + private FileFilter m_oFFilt; + private Class m_oExecClass; + private DomElement m_oChParms; + private int m_iQthr = 0, m_iMaxThr; + private StringBuilder m_sb; + private int m_iSbIni; + + private GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception + { + m_oThrGrp = p_oThrGrp; + m_sb = new StringBuilder("GroupOfThreads ") + .append(m_oThrGrp.getName()).append(" : "); + m_iSbIni = m_sb.length(); + } //________________________________ + + public void update(Observable p_oObs, Object p_oUsrObj) + { + if (p_oUsrObj instanceof Integer) + { + updQthreads( ( (Integer) p_oUsrObj).intValue()); + } + } //________________________________ + + private synchronized void updQthreads(int p_i) + { + m_iQthr += p_i; + } //________________________________ + + private void execute(DomElement p_oP) throws Exception + { + m_sb.setLength(m_iSbIni); + if (m_bError) + { + m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors") + .toString()); + return; + } + checkParms(p_oP); + + File[] oaF = m_oInpDir.listFiles(m_oFFilt); + + for (File oFcurr : oaF) + { + if (m_iQthr >= m_iMaxThr) + { + m_oLogger.info(m_sb.append("Waiting for available threads").toString()); + Thread.sleep(5000); + break; + } + m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE); + DomElement oThisProc = new DomElement(EsbAbstractProcessor.PARMS_THIS_INSTANCE); + oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName()); + m_oChParms.addElemChild(oThisProc); + + new Thread(m_oThrGrp, + new ChildProcess(m_oExecClass, this, m_oChParms)).start(); + Thread.sleep(500); + } + } //________________________________ + + protected void checkParms(DomElement p_oP) throws Exception + { + String sAtt = p_oP.getAttr(PARM_MAX_THREADS); + m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt); + if (m_iMaxThr > 10) + { + m_iMaxThr = 10; + } + String sUid = p_oP.getName(); + m_oChParms = p_oP.cloneObj(); + sAtt = m_oChParms.getAttr(PARM_INPUT_DIR); + if (null == sAtt) + { + throw new Exception(m_sb.append("Missing ").append(PARM_INPUT_DIR) + .append(" attribute in -parameters ") + .append(sUid).toString()); + } + m_oInpDir = new File(new URI(sAtt)); + if (!m_oInpDir.isDirectory()) + { + throw new Exception(m_sb.append(sAtt).append(" is not a directory"). + toString()); + } + if (!m_oInpDir.canRead()) + { + throw new Exception(m_sb.append("Can't read directory ").append(sAtt). + toString()); + } + + sAtt = m_oChParms.getAttr(PARM_SUFFIX); + if (null == sAtt) + { + throw new Exception(m_sb.append("Missing ").append(PARM_SUFFIX) + .append(" attribute in -parameters ") + .append(sUid).toString()); + } + + m_oFFilt = new FileEndsWith(sAtt); + + do + { + sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS); + m_oExecClass = null; + if (null == sAtt) + { + throw new Exception(m_sb.append("Missing actionClass attribute"). + toString()); + } + try + { + m_oExecClass = Class.forName(sAtt); + } + catch (ClassNotFoundException e) + { + throw new Exception(m_sb.append("Class ").append(sAtt) + .append(" not found in classpath").toString()); + } + try + { + m_oExecClass.getConstructor(new Class[] {DomElement.class}); + } + catch (NoSuchMethodException eN) + { + throw new Exception(m_sb.append("No appropriate constructor") + .append(" (DomElement) found for class").toString()); + } + } + while (false); + } //________________________________ + + private class FileEndsWith implements FileFilter + { + String m_sSuffix; + FileEndsWith(String p_sEnd) throws Exception + { + m_sSuffix = p_sEnd; + if (null == m_sSuffix) + { + throw new Exception("Must specify file extension"); + } + } //_________________________________________ + + public boolean accept(File p_f) + { + if (!p_f.isFile()) + { + return false; + } + return p_f.toString().endsWith(m_sSuffix); + } //_________________________________________ + } //___________________________________________________ + } //______________________________________________________ + + private static class ChildProcess extends Observable implements Runnable + { private Class m_oExecClass; + private DomElement m_oParms; + private Logger m_oLogger; + public ChildProcess(Class p_oExec, Observer p_oObs, DomElement p_oP) + { m_oLogger = EsbUtil.getDefaultLogger(this.getClass()); + m_oExecClass = p_oExec; + this.addObserver(p_oObs); + m_oParms = p_oP; + setChanged(); + notifyObservers(new Integer(1)); + } //__________________________________ + + public void run() + { try + { Constructor oCnst = m_oExecClass.getConstructor(new Class[] {DomElement.class}); + Object oProc = oCnst.newInstance(new Object[] {m_oParms}); + ((EsbFileProcessor)oProc).execute(); + } + catch (Exception e) { m_oLogger.error("run() FAILED",e); } + + setChanged(); + notifyObservers(new Integer(-1)); + } //__________________________________ + } //____________________________________________________________________________ +} //____________________________________________________________________________ Deleted: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OneChildProcess.java =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OneChildProcess.java 2006-07-03 16:58:38 UTC (rev 4902) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OneChildProcess.java 2006-07-03 20:15:11 UTC (rev 4903) @@ -1,59 +0,0 @@ -/* -* JBoss, Home of Professional Open Source -* Copyright 2006, JBoss Inc., and individual contributors as indicated -* by the @authors tag. See the copyright.txt in the distribution for a -* full listing of individual contributors. -* -* This is free software; you can redistribute it and/or modify it -* under the terms of the GNU Lesser General Public License as -* published by the Free Software Foundation; either version 2.1 of -* the License, or (at your option) any later version. -* -* This software is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -* Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public -* License along with this software; if not, write to the Free -* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA -* 02110-1301 USA, or see the FSF site: http://www.fsf.org. -*/ - - -package org.jboss.soa.esb.listeners; - -import java.util.*; -import java.lang.reflect.*; -import org.apache.log4j.*; - -import org.jboss.soa.esb.helpers.*; -import org.jboss.soa.esb.processors.*; -import org.jboss.soa.esb.util.EsbUtil; - - -public class OneChildProcess extends Observable implements Runnable -{ private Class m_oExecClass; - private DomElement m_oParms; - private Logger m_oLogger; - public OneChildProcess(Class p_oExec, Observer p_oObs, DomElement p_oP) - { m_oLogger = EsbUtil.getDefaultLogger(this.getClass()); - m_oExecClass = p_oExec; - this.addObserver(p_oObs); - m_oParms = p_oP; - setChanged(); - notifyObservers(new Integer(1)); - } //__________________________________ - - public void run() - { try - { Constructor oCnst = m_oExecClass.getConstructor(new Class[] {DomElement.class}); - Object oProc = oCnst.newInstance(new Object[] {m_oParms}); - ((EsbFileProcessor)oProc).execute(); - } - catch (Exception e) { m_oLogger.error("run() FAILED",e); } - - setChanged(); - notifyObservers(new Integer(-1)); - } //__________________________________ -} //____________________________________________________________________________ |