From: <jbo...@li...> - 2006-06-30 18:44:16
|
Author: estebanschifman Date: 2006-06-30 14:44:10 -0400 (Fri, 30 Jun 2006) New Revision: 4890 Added: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java Log: Better version of the DirListener class This one extends AbstractPoller. Several methods and parameters of the old DirListener have been moved to AbstractPoller Added: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java =================================================================== --- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java 2006-06-30 18:43:05 UTC (rev 4889) +++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java 2006-06-30 18:44:10 UTC (rev 4890) @@ -0,0 +1,215 @@ +/* +* JBoss, Home of Professional Open Source +* Copyright 2006, JBoss Inc., and individual contributors as indicated +* by the @authors tag. See the copyright.txt in the distribution for a +* full listing of individual contributors. +* +* This is free software; you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as +* published by the Free Software Foundation; either version 2.1 of +* the License, or (at your option) any later version. +* +* This software is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this software; if not, write to the Free +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +* 02110-1301 USA, or see the FSF site: http://www.fsf.org. +*/ + + +package org.jboss.soa.esb.listeners; + +import java.io.*; +import java.net.*; +import org.jboss.soa.esb.util.*; +import org.jboss.soa.esb.helpers.*; +import org.jboss.soa.esb.processors.*; + +public class BetterDirListener extends AbstractPoller +{ + public static void main(String[] args) throws Exception + { + new BetterDirListener(args[0]); + } //________________________________ + + public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass"; + public static final String PARM_INPUT_DIR = "inputDirURI"; + public static final String PARM_SUFFIX = "inputSuffix"; + + public BetterDirListener(String p_sParamsUid) throws Exception + { + super(p_sParamsUid); +// See superclass - It provides ability to request end by subscribing to a Topic + runUntilEndRequested(); + } //__________________________________ + + @Override + protected void executeOneCycle() throws Exception + { + String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY); + long lNewLoad = System.currentTimeMillis() + + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000); + DomElement[] oaParms = m_oParms.getAllElemChildren(); + + sAtt = m_oParms.getAttr(PARM_POLL_LTCY); + long lPollLtcy = (null != sAtt) + ? (1000 * Integer.parseInt(sAtt)) + : 20000; // if poll latency was not there, do it every 20 secs + + if (lPollLtcy < 3000) + lPollLtcy = 3000; // but not too often + + boolean bFirst = true; + while (System.currentTimeMillis() <= lNewLoad) + { + for (DomElement oCurr : oaParms) + { + super.oneScan(oCurr, bFirst); + } + long lSlack = lNewLoad - System.currentTimeMillis(); + if (lSlack < 0) + { + break; + } + if (waitForQuiesce(Math.min(lSlack, lPollLtcy))) + { m_bEndRequested = true; + return; + } + bFirst = false; + } + } //_________________________________________ + + @Override + protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception + { + return new MyChildGroup(pThG); + } //_________________________________________ + + private class MyChildGroup extends AbstractPoller.GroupOfChilds + { + private File m_oInpDir; + private FileFilter m_oFFilt; + + private MyChildGroup(ThreadGroup p_oThrGrp) throws Exception + { super(p_oThrGrp); + } //________________________________ + + @Override + protected void doYourJob(DomElement p_oP) throws Exception + { m_sb.setLength(m_iSbIni); + if (m_bError) + { + m_oLogger.warn(formatLogMsg(" Skipping execution due to previous errors")); + return; + } + checkParms(p_oP); + + File[] oaF = m_oInpDir.listFiles(m_oFFilt); + + for (File oFcurr : oaF) + { + if (m_iQthr >= m_iMaxThr) + { + m_oLogger.info(m_sb.append("Waiting for available threads").toString()); + Thread.sleep(5000); + break; + } + m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE); + DomElement oThisProc = new DomElement(EsbAbstractProcessor.PARMS_THIS_INSTANCE); + oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName()); + m_oChParms.addElemChild(oThisProc); + + new Thread(m_oThrGrp, + new OneChildProcess(m_oExecClass, this, m_oChParms)).start(); + Thread.sleep(500); + } + } + + protected void checkParms(DomElement p_oP) throws Exception + { + super.checkParms(p_oP); + String sAtt = p_oP.getAttr(PARM_MAX_THREADS); + + sAtt = m_oChParms.getAttr(PARM_INPUT_DIR); + if (null == sAtt) + { throw new Exception(formatLogMsg( + m_sb.append("Missing ").append(PARM_INPUT_DIR) + .append(" attribute in -parameters ") + .toString())); + } + m_oInpDir = new File(new URI(sAtt)); + if (!m_oInpDir.isDirectory()) + { throw new Exception(formatLogMsg( + m_sb.append(sAtt).append(" is not a directory").toString())); + } + if (!m_oInpDir.canRead()) + { throw new Exception(formatLogMsg( + m_sb.append("Can't read directory ").append(sAtt). + toString())); + } + + sAtt = m_oChParms.getAttr(PARM_SUFFIX); + if (null == sAtt) + { throw new Exception(formatLogMsg( + m_sb.append("Missing ").append(PARM_SUFFIX) + .append(" attribute in -parameters ") + .toString())); + } + + m_oFFilt = new FileEndsWith(sAtt); + + do + { + sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS); + m_oExecClass = null; + if (null == sAtt) + { throw new Exception(formatLogMsg( + m_sb.append("Missing actionClass attribute"). + toString())); + } + try + { + m_oExecClass = Class.forName(sAtt); + } + catch (ClassNotFoundException e) + { throw new Exception(formatLogMsg( + m_sb.append("Class ").append(sAtt) + .append(" not found in classpath").toString())); + } + try + { + m_oExecClass.getConstructor(new Class[] {DomElement.class}); + } + catch (NoSuchMethodException eN) + { throw new Exception(formatLogMsg( + m_sb.append("No appropriate constructor") + .append(" (DomElement) found for class").toString())); + } + } + while (false); + } //________________________________ + + + private class FileEndsWith implements FileFilter + { + String m_sSuffix; + FileEndsWith(String p_sEnd) throws Exception + { + m_sSuffix = p_sEnd; + if (EsbUtil.isNullString(m_sSuffix)) + throw new Exception("Must specify file extension"); + } //_________________________________________ + + public boolean accept(File p_f) + { return (p_f.isFile()) + ? p_f.toString().endsWith(m_sSuffix) + : false; + } //_________________________________________ + } //___________________________________________________ + } //______________________________________________________ + +} //____________________________________________________________________________ |