|
From: <jbo...@li...> - 2006-07-03 20:15:19
|
Author: estebanschifman
Date: 2006-07-03 16:15:11 -0400 (Mon, 03 Jul 2006)
New Revision: 4903
Added:
labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OldDirListener.java
Removed:
labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java
labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OneChildProcess.java
Modified:
labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java
Log:
Renamed old 'DirListener' to 'OldDirListener'
Got rid of the stand alone OneChildProcess class. It's now an inner class
Moved some methods/variables from BetterListener to AbstractPoller
Modified: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-07-03 16:58:38 UTC (rev 4902)
+++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-07-03 20:15:11 UTC (rev 4903)
@@ -23,6 +23,7 @@
package org.jboss.soa.esb.listeners;
+import java.lang.reflect.Constructor;
import java.util.*;
import org.apache.log4j.*;
@@ -34,13 +35,23 @@
import org.jboss.soa.esb.common.*;
import org.jboss.soa.esb.helpers.*;
import org.jboss.soa.esb.parameters.*;
+import org.jboss.soa.esb.processors.EsbFileProcessor;
public abstract class AbstractPoller
{
- protected abstract void executeOneCycle()throws Exception;
protected abstract GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception;
+ // You can override these three values at constructor time of your
+ // derived class after calling super(String)
+ protected int m_iMinPollMillis = 3000 // minimum polling interval
+ ,m_iDfltPollMillis = 20000 // default polling interval
+ ,m_iDfltReloadMillis= 180000 // default interval between
+ // parameter reloading
+ ;
+
+ public static final String PARM_ACTION_CLASS = "actionClass";
+
public static final String PARM_POLL_LTCY = "pollLatency";
public static final String PARM_RELOAD_LTCY = "reloadLatency";
@@ -63,6 +74,7 @@
protected TopicSession m_oSession;
protected Topic m_oTopic;
protected TopicSubscriber m_oTopicSubs;
+
protected AbstractPoller(String p_sParamsUid) throws Exception
@@ -97,6 +109,45 @@
}
} //__________________________________
+ private void executeOneCycle() throws Exception
+ {
+ String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
+ long lNewLoad = System.currentTimeMillis()
+ + ( (null != sAtt)
+ ? (1000 * Integer.parseInt(sAtt))
+ : m_iDfltReloadMillis
+ );
+
+ DomElement[] oaParms = m_oParms.getAllElemChildren();
+
+ sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
+ long lPollLtcy = (null != sAtt)
+ ? (1000 * Integer.parseInt(sAtt))
+ : m_iDfltPollMillis;
+
+ if (lPollLtcy < m_iMinPollMillis)
+ lPollLtcy = m_iMinPollMillis;
+
+ boolean bFirst = true;
+ while (System.currentTimeMillis() <= lNewLoad)
+ {
+ for (DomElement oCurr : oaParms)
+ {
+ oneScan(oCurr, bFirst);
+ }
+ long lSlack = lNewLoad - System.currentTimeMillis();
+ if (lSlack < 0)
+ {
+ break;
+ }
+ if (waitForQuiesce(Math.min(lSlack, lPollLtcy)))
+ { m_bEndRequested = true;
+ return;
+ }
+ bFirst = false;
+ }
+ } //_________________________________________
+
protected String formatLogMsg(String p_s)
{ return new StringBuilder("Processor '")
.append(EsbUtil.classSuffix(this.getClass())).append("' <")
@@ -220,14 +271,14 @@
public static final String PARM_MAX_THREADS = "maxThreads";
- protected ThreadGroup m_oThrGrp;
- protected boolean m_bError = false;
+ protected ThreadGroup m_oThrGrp;
+ protected boolean m_bError = false;
protected Class m_oExecClass;
protected DomElement m_oChParms;
protected int m_iQthr = 0, m_iMaxThr;
- protected StringBuilder m_sb;
+ protected StringBuilder m_sb;
protected int m_iSbIni;
protected GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
@@ -280,10 +331,37 @@
// and to add REAL parameter checking
protected void checkParms(DomElement p_oP) throws Exception
{
- m_sb.setLength(0);
+ m_sb.setLength(m_iSbIni);
m_oChParms = p_oP.cloneObj();
setMaxThreads(p_oP,10);
} //________________________________
- } //______________________________________________________
+ } //______________________________________________________
+
+ protected static class ChildProcess extends Observable implements Runnable
+ { private Class m_oExecClass;
+ private DomElement m_oParms;
+ private Logger m_oLogger;
+ public ChildProcess(Class p_oExec, Observer p_oObs, DomElement p_oP)
+ { m_oLogger = EsbUtil.getDefaultLogger(this.getClass());
+ m_oExecClass = p_oExec;
+ this.addObserver(p_oObs);
+ m_oParms = p_oP;
+ setChanged();
+ notifyObservers(new Integer(1));
+ } //__________________________________
+
+ public void run()
+ { try
+ { Constructor oCnst = m_oExecClass.getConstructor(new Class[] {DomElement.class});
+ Object oProc = oCnst.newInstance(new Object[] {m_oParms});
+ ((EsbFileProcessor)oProc).execute();
+ }
+ catch (Exception e) { m_oLogger.error("run() FAILED",e); }
+
+ setChanged();
+ notifyObservers(new Integer(-1));
+ } //__________________________________
+ } //______________________________________________________
+
} //____________________________________________________________________________
Modified: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java 2006-07-03 16:58:38 UTC (rev 4902)
+++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/BetterDirListener.java 2006-07-03 20:15:11 UTC (rev 4903)
@@ -36,54 +36,16 @@
new BetterDirListener(args[0]);
} //________________________________
- public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass";
- public static final String PARM_INPUT_DIR = "inputDirURI";
- public static final String PARM_SUFFIX = "inputSuffix";
-
public BetterDirListener(String p_sParamsUid) throws Exception
{
super(p_sParamsUid);
+ m_iDfltReloadMillis = 180000;
+ m_iDfltPollMillis = 20000;
+ m_iMinPollMillis = 5000;
// See superclass - It provides ability to request end by subscribing to a Topic
runUntilEndRequested();
} //__________________________________
- @Override
- protected void executeOneCycle() throws Exception
- {
- String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
- long lNewLoad = System.currentTimeMillis()
- + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000);
- DomElement[] oaParms = m_oParms.getAllElemChildren();
-
- sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
- long lPollLtcy = (null != sAtt)
- ? (1000 * Integer.parseInt(sAtt))
- : 20000; // if poll latency was not there, do it every 20 secs
-
- if (lPollLtcy < 3000)
- lPollLtcy = 3000; // but not too often
-
- boolean bFirst = true;
- while (System.currentTimeMillis() <= lNewLoad)
- {
- for (DomElement oCurr : oaParms)
- {
- super.oneScan(oCurr, bFirst);
- }
- long lSlack = lNewLoad - System.currentTimeMillis();
- if (lSlack < 0)
- {
- break;
- }
- if (waitForQuiesce(Math.min(lSlack, lPollLtcy)))
- { m_bEndRequested = true;
- return;
- }
- bFirst = false;
- }
- } //_________________________________________
-
- @Override
protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception
{
return new MyChildGroup(pThG);
@@ -91,6 +53,10 @@
private class MyChildGroup extends AbstractPoller.GroupOfChilds
{
+ public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass";
+ public static final String PARM_INPUT_DIR = "inputDirURI";
+ public static final String PARM_SUFFIX = "inputSuffix";
+
private File m_oInpDir;
private FileFilter m_oFFilt;
@@ -100,14 +66,7 @@
@Override
protected void doYourJob(DomElement p_oP) throws Exception
- { m_sb.setLength(m_iSbIni);
- if (m_bError)
- {
- m_oLogger.warn(formatLogMsg(" Skipping execution due to previous errors"));
- return;
- }
- checkParms(p_oP);
-
+ {
File[] oaF = m_oInpDir.listFiles(m_oFFilt);
for (File oFcurr : oaF)
@@ -124,17 +83,16 @@
m_oChParms.addElemChild(oThisProc);
new Thread(m_oThrGrp,
- new OneChildProcess(m_oExecClass, this, m_oChParms)).start();
+ new ChildProcess(m_oExecClass, this, m_oChParms)).start();
Thread.sleep(500);
}
- }
+ } //________________________________
protected void checkParms(DomElement p_oP) throws Exception
{
super.checkParms(p_oP);
- String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
- sAtt = m_oChParms.getAttr(PARM_INPUT_DIR);
+ String sAtt = m_oChParms.getAttr(PARM_INPUT_DIR);
if (null == sAtt)
{ throw new Exception(formatLogMsg(
m_sb.append("Missing ").append(PARM_INPUT_DIR)
@@ -168,7 +126,7 @@
m_oExecClass = null;
if (null == sAtt)
{ throw new Exception(formatLogMsg(
- m_sb.append("Missing actionClass attribute").
+ m_sb.append("Missing fileProcessorClass attribute").
toString()));
}
try
Deleted: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-07-03 16:58:38 UTC (rev 4902)
+++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-07-03 20:15:11 UTC (rev 4903)
@@ -1,411 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-
-
-package org.jboss.soa.esb.listeners;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import org.apache.log4j.*;
-import javax.naming.*;
-import javax.jms.*;
-
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.common.*;
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.parameters.*;
-import org.jboss.soa.esb.processors.*;
-
-//import org.jboss.soa.esb.nagios.*;
-
-public class DirListener
-{
- public static void main(String[] args) throws Exception
- {
- new DirListener(args[0]);
- } //________________________________
-
- public static final String PARM_POLL_LTCY = "pollLatency";
- public static final String PARM_RELOAD_LTCY = "reloadLatency";
-
- public static final String PARM_MAX_THREADS = "maxThreads";
- public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass";
-
- public static final String PARM_INPUT_DIR = "inputDirURI";
- public static final String PARM_SUFFIX = "inputSuffix";
-
- private Map<String,GroupOfChilds> m_omChildPrc
- = new HashMap<String,GroupOfChilds>();
-
- private ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup();
-
- private Logger m_oLogger;
- private DomElement m_oParms;
- private ParamsRepository m_oParmRepos;
-
- private TopicConnection m_oTopicConn = null;
- private TopicSession m_oSession = null;
- private Topic m_oTopic = null;
- private TopicSubscriber m_oTopicSub = null;
-
- public DirListener(String p_sParamsUid) throws Exception
- {
- m_oLogger = EsbUtil.getDefaultLogger(this.getClass());
-// setupSubscribe();
- /*
- * Removing Nagios integration
- try
- {
- String sNagServer = System.getProperty("jbossEsb.nagios.server");
- if (sNagServer != null)
- {
- int iNagPort = Integer.parseInt
- (System.getProperty("jbossEsb.nagios.port","5667"));
- String sNagService =
- System.getProperty("jbossEsb.nagios.service");
- new NagiosStandaloneHeartbeat(
- sNagServer, iNagPort, sNagService, "rosetta-listener-service");
- }
- }
- catch (Exception eNagios)
- {
- m_oLogger.info("Problems with Nagios Notification", eNagios);
- }
- */
-
- String sFactoryClass = EsbSysProps.getParamsReposFactoryClass();
- m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null);
-
- Name oParms = ParamsReposUtil.nameFromString(p_sParamsUid);
- while (loadParmsCycle(oParms))
- {
- }
- } //__________________________________
-
- protected boolean loadParmsCycle(Name p_oParams) throws Exception
- {
- String sMsg = (null == m_oParms)
- ? "Initial Parameter loading" : "Reloading Params";
- m_oLogger.info(sMsg);
-
- try
- {
- m_oParms = m_oParmRepos.getElement(p_oParams);
- }
- catch (Exception e)
- {
- m_oLogger.warn("Failed to load parameters");
- if (null == m_oParms)
- {
- throw e;
- }
- }
-
- String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
- long lNewLoad = System.currentTimeMillis()
- + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000);
- DomElement[] oaParms = m_oParms.getAllElemChildren();
-
- sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
- long lPollLtcy = (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 20000;
- if (lPollLtcy < 3000)
- {
- lPollLtcy = 3000;
-
- }
- boolean bFirst = true;
- while (System.currentTimeMillis() <= lNewLoad)
- {
- for (DomElement oCurr : oaParms)
- {
- oneScan(oCurr, bFirst);
- }
- long lSlack = lNewLoad - System.currentTimeMillis();
- if (lSlack < 0)
- {
- break;
- }
- if (waitForQuiesce(Math.min(lSlack, lPollLtcy)))
- {
- return false;
- }
- bFirst = false;
- }
- return true;
- } //_________________________________________
-
- public void setupSubscribe() throws JMSException, NamingException
- {
- try
- {
- String sStopTopic = "topic/quiesce";
- StringBuilder sb = new StringBuilder("processor='DirListener'");
-
- String sJndiType = EsbSysProps.getJndiServerType();
- String sJndiServer = EsbSysProps.getJndiServerURL();
- Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiServer);
-
- Object tmp = oJndiCtx.lookup("UIL2ConnectionFactory");
- TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
- m_oTopicConn = tcf.createTopicConnection();
- m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic);
- m_oSession = m_oTopicConn.createTopicSession
- (false,TopicSession.AUTO_ACKNOWLEDGE);
- m_oTopicConn.start();
- m_oTopicSub = m_oSession.createSubscriber(m_oTopic, sb.toString(),true);
- }
- catch (Exception e)
- { m_oLogger.error("Problems connecting to JMS. ",e);
- }
-
-
- } //_________________________________________
-
- private boolean waitForQuiesce(long p_lMillis) throws Exception
- {
- try
- { boolean bRec = false;
-
- if (null != secureReceive(p_lMillis))
- { bRec = true;
- m_oLogger.info("Starting Quiesce of Listener. ");
- }
- else
- Thread.sleep(p_lMillis);
- return bRec;
-
- }
- catch (Exception e)
- { m_oLogger.error("Problems with waitForQuiesce. ",e);
- Thread.sleep(p_lMillis);
- return false;
- }
- } //_________________________________________
-
- private Object secureReceive(long p_lMillis) throws Exception
- {
- while (true)
- try
- { return (null==m_oTopicSub) ? null : m_oTopicSub.receive(p_lMillis); }
- catch (JMSException e)
- {
- // put here your recovery code
- return null;
- }
-
- } //_________________________________________
-
-
- private void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception
- {
- String sPrcName = p_oP.getName();
- if (!m_omChildPrc.containsKey(sPrcName))
- {
- ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()];
- int iMax = m_oThrGrp.enumerate(oaCh);
-
- ThreadGroup oThG = null;
- for (int i1 = 0; null == oThG && i1 < iMax; i1++)
- { if (m_oThrGrp.getName().equals(sPrcName))
- oThG = oaCh[i1];
- }
- if (null == oThG)
- oThG = new ThreadGroup(sPrcName);
- m_omChildPrc.put(sPrcName, new GroupOfChilds(oThG));
- }
- GroupOfChilds oCnt = m_omChildPrc.get(sPrcName);
-
- if (null == oCnt) return;
- if (p_bFirst)
- oCnt.m_bError = false;
-
- try
- {
- oCnt.execute(p_oP);
- }
- catch (Exception e)
- {
- oCnt.m_bError = true;
- m_oLogger.error("GroupOfChilds.execute", e);
- }
- } //_________________________________________
-
- private class GroupOfChilds implements Observer
- {
- private ThreadGroup m_oThrGrp;
- private boolean m_bError = false;
-
- private File m_oInpDir;
- private FileFilter m_oFFilt;
- private Class m_oExecClass;
- private DomElement m_oChParms;
- private int m_iQthr = 0, m_iMaxThr;
- private StringBuilder m_sb;
- private int m_iSbIni;
-
- private GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
- {
- m_oThrGrp = p_oThrGrp;
- m_sb = new StringBuilder("GroupOfThreads ")
- .append(m_oThrGrp.getName()).append(" : ");
- m_iSbIni = m_sb.length();
- } //________________________________
-
- public void update(Observable p_oObs, Object p_oUsrObj)
- {
- if (p_oUsrObj instanceof Integer)
- {
- updQthreads( ( (Integer) p_oUsrObj).intValue());
- }
- } //________________________________
-
- private synchronized void updQthreads(int p_i)
- {
- m_iQthr += p_i;
- } //________________________________
-
- private void execute(DomElement p_oP) throws Exception
- {
- m_sb.setLength(m_iSbIni);
- if (m_bError)
- {
- m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors")
- .toString());
- return;
- }
- checkParms(p_oP);
-
- File[] oaF = m_oInpDir.listFiles(m_oFFilt);
-
- for (File oFcurr : oaF)
- {
- if (m_iQthr >= m_iMaxThr)
- {
- m_oLogger.info(m_sb.append("Waiting for available threads").toString());
- Thread.sleep(5000);
- break;
- }
- m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE);
- DomElement oThisProc = new DomElement(EsbAbstractProcessor.PARMS_THIS_INSTANCE);
- oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName());
- m_oChParms.addElemChild(oThisProc);
-
- new Thread(m_oThrGrp,
- new OneChildProcess(m_oExecClass, this, m_oChParms)).start();
- Thread.sleep(500);
- }
- } //________________________________
-
- protected void checkParms(DomElement p_oP) throws Exception
- {
- String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
- m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt);
- if (m_iMaxThr > 10)
- {
- m_iMaxThr = 10;
- }
- String sUid = p_oP.getName();
- m_oChParms = p_oP.cloneObj();
- sAtt = m_oChParms.getAttr(PARM_INPUT_DIR);
- if (null == sAtt)
- {
- throw new Exception(m_sb.append("Missing ").append(PARM_INPUT_DIR)
- .append(" attribute in -parameters ")
- .append(sUid).toString());
- }
- m_oInpDir = new File(new URI(sAtt));
- if (!m_oInpDir.isDirectory())
- {
- throw new Exception(m_sb.append(sAtt).append(" is not a directory").
- toString());
- }
- if (!m_oInpDir.canRead())
- {
- throw new Exception(m_sb.append("Can't read directory ").append(sAtt).
- toString());
- }
-
- sAtt = m_oChParms.getAttr(PARM_SUFFIX);
- if (null == sAtt)
- {
- throw new Exception(m_sb.append("Missing ").append(PARM_SUFFIX)
- .append(" attribute in -parameters ")
- .append(sUid).toString());
- }
-
- m_oFFilt = new FileEndsWith(sAtt);
-
- do
- {
- sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS);
- m_oExecClass = null;
- if (null == sAtt)
- {
- throw new Exception(m_sb.append("Missing actionClass attribute").
- toString());
- }
- try
- {
- m_oExecClass = Class.forName(sAtt);
- }
- catch (ClassNotFoundException e)
- {
- throw new Exception(m_sb.append("Class ").append(sAtt)
- .append(" not found in classpath").toString());
- }
- try
- {
- m_oExecClass.getConstructor(new Class[] {DomElement.class});
- }
- catch (NoSuchMethodException eN)
- {
- throw new Exception(m_sb.append("No appropriate constructor")
- .append(" (DomElement) found for class").toString());
- }
- }
- while (false);
- } //________________________________
-
- private class FileEndsWith implements FileFilter
- {
- String m_sSuffix;
- FileEndsWith(String p_sEnd) throws Exception
- {
- m_sSuffix = p_sEnd;
- if (null == m_sSuffix)
- {
- throw new Exception("Must specify file extension");
- }
- } //_________________________________________
-
- public boolean accept(File p_f)
- {
- if (!p_f.isFile())
- {
- return false;
- }
- return p_f.toString().endsWith(m_sSuffix);
- } //_________________________________________
- } //___________________________________________________
- } //______________________________________________________
-} //____________________________________________________________________________
Copied: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OldDirListener.java (from rev 4902, labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java)
===================================================================
--- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/DirListener.java 2006-07-03 16:58:38 UTC (rev 4902)
+++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OldDirListener.java 2006-07-03 20:15:11 UTC (rev 4903)
@@ -0,0 +1,440 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+
+package org.jboss.soa.esb.listeners;
+
+import java.io.*;
+import java.lang.reflect.Constructor;
+import java.net.*;
+import java.util.*;
+
+import org.apache.log4j.*;
+
+import javax.naming.*;
+import javax.jms.*;
+
+import org.jboss.soa.esb.util.*;
+import org.jboss.soa.esb.common.*;
+import org.jboss.soa.esb.helpers.*;
+import org.jboss.soa.esb.parameters.*;
+import org.jboss.soa.esb.processors.*;
+
+//import org.jboss.soa.esb.nagios.*;
+
+public class OldDirListener
+{
+ public static void main(String[] args) throws Exception
+ {
+ new OldDirListener(args[0]);
+ } //________________________________
+
+ public static final String PARM_POLL_LTCY = "pollLatency";
+ public static final String PARM_RELOAD_LTCY = "reloadLatency";
+
+ public static final String PARM_MAX_THREADS = "maxThreads";
+ public static final String PARM_FILE_PROCESSOR_CLASS = "fileProcessorClass";
+
+ public static final String PARM_INPUT_DIR = "inputDirURI";
+ public static final String PARM_SUFFIX = "inputSuffix";
+
+ private Map<String,GroupOfChilds> m_omChildPrc
+ = new HashMap<String,GroupOfChilds>();
+
+ private ThreadGroup m_oThrGrp = Thread.currentThread().getThreadGroup();
+
+ private Logger m_oLogger;
+ private DomElement m_oParms;
+ private ParamsRepository m_oParmRepos;
+
+ private TopicConnection m_oTopicConn = null;
+ private TopicSession m_oSession = null;
+ private Topic m_oTopic = null;
+ private TopicSubscriber m_oTopicSub = null;
+
+ public OldDirListener(String p_sParamsUid) throws Exception
+ {
+ m_oLogger = EsbUtil.getDefaultLogger(this.getClass());
+// setupSubscribe();
+ /*
+ * Removing Nagios integration
+ try
+ {
+ String sNagServer = System.getProperty("jbossEsb.nagios.server");
+ if (sNagServer != null)
+ {
+ int iNagPort = Integer.parseInt
+ (System.getProperty("jbossEsb.nagios.port","5667"));
+ String sNagService =
+ System.getProperty("jbossEsb.nagios.service");
+ new NagiosStandaloneHeartbeat(
+ sNagServer, iNagPort, sNagService, "rosetta-listener-service");
+ }
+ }
+ catch (Exception eNagios)
+ {
+ m_oLogger.info("Problems with Nagios Notification", eNagios);
+ }
+ */
+
+ String sFactoryClass = EsbSysProps.getParamsReposFactoryClass();
+ m_oParmRepos = ParamsReposUtil.reposFromFactory(sFactoryClass,null);
+
+ Name oParms = ParamsReposUtil.nameFromString(p_sParamsUid);
+ while (loadParmsCycle(oParms))
+ {
+ }
+ } //__________________________________
+
+ protected boolean loadParmsCycle(Name p_oParams) throws Exception
+ {
+ String sMsg = (null == m_oParms)
+ ? "Initial Parameter loading" : "Reloading Params";
+ m_oLogger.info(sMsg);
+
+ try
+ {
+ m_oParms = m_oParmRepos.getElement(p_oParams);
+ }
+ catch (Exception e)
+ {
+ m_oLogger.warn("Failed to load parameters");
+ if (null == m_oParms)
+ {
+ throw e;
+ }
+ }
+
+ String sAtt = m_oParms.getAttr(PARM_RELOAD_LTCY);
+ long lNewLoad = System.currentTimeMillis()
+ + ( (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 180000);
+ DomElement[] oaParms = m_oParms.getAllElemChildren();
+
+ sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
+ long lPollLtcy = (null != sAtt) ? (1000 * Integer.parseInt(sAtt)) : 20000;
+ if (lPollLtcy < 3000)
+ {
+ lPollLtcy = 3000;
+
+ }
+ boolean bFirst = true;
+ while (System.currentTimeMillis() <= lNewLoad)
+ {
+ for (DomElement oCurr : oaParms)
+ {
+ oneScan(oCurr, bFirst);
+ }
+ long lSlack = lNewLoad - System.currentTimeMillis();
+ if (lSlack < 0)
+ {
+ break;
+ }
+ if (waitForQuiesce(Math.min(lSlack, lPollLtcy)))
+ {
+ return false;
+ }
+ bFirst = false;
+ }
+ return true;
+ } //_________________________________________
+
+ public void setupSubscribe() throws JMSException, NamingException
+ {
+ try
+ {
+ String sStopTopic = "topic/quiesce";
+ StringBuilder sb = new StringBuilder("processor='OldDirListener'");
+
+ String sJndiType = EsbSysProps.getJndiServerType();
+ String sJndiServer = EsbSysProps.getJndiServerURL();
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiServer);
+
+ Object tmp = oJndiCtx.lookup("UIL2ConnectionFactory");
+ TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
+ m_oTopicConn = tcf.createTopicConnection();
+ m_oTopic = (Topic) oJndiCtx.lookup(sStopTopic);
+ m_oSession = m_oTopicConn.createTopicSession
+ (false,TopicSession.AUTO_ACKNOWLEDGE);
+ m_oTopicConn.start();
+ m_oTopicSub = m_oSession.createSubscriber(m_oTopic, sb.toString(),true);
+ }
+ catch (Exception e)
+ { m_oLogger.error("Problems connecting to JMS. ",e);
+ }
+
+
+ } //_________________________________________
+
+ private boolean waitForQuiesce(long p_lMillis) throws Exception
+ {
+ try
+ { boolean bRec = false;
+
+ if (null != secureReceive(p_lMillis))
+ { bRec = true;
+ m_oLogger.info("Starting Quiesce of Listener. ");
+ }
+ else
+ Thread.sleep(p_lMillis);
+ return bRec;
+
+ }
+ catch (Exception e)
+ { m_oLogger.error("Problems with waitForQuiesce. ",e);
+ Thread.sleep(p_lMillis);
+ return false;
+ }
+ } //_________________________________________
+
+ private Object secureReceive(long p_lMillis) throws Exception
+ {
+ while (true)
+ try
+ { return (null==m_oTopicSub) ? null : m_oTopicSub.receive(p_lMillis); }
+ catch (JMSException e)
+ {
+ // put here your recovery code
+ return null;
+ }
+
+ } //_________________________________________
+
+
+ private void oneScan(DomElement p_oP, boolean p_bFirst) throws Exception
+ {
+ String sPrcName = p_oP.getName();
+ if (!m_omChildPrc.containsKey(sPrcName))
+ {
+ ThreadGroup[] oaCh = new ThreadGroup[m_oThrGrp.activeGroupCount()];
+ int iMax = m_oThrGrp.enumerate(oaCh);
+
+ ThreadGroup oThG = null;
+ for (int i1 = 0; null == oThG && i1 < iMax; i1++)
+ { if (m_oThrGrp.getName().equals(sPrcName))
+ oThG = oaCh[i1];
+ }
+ if (null == oThG)
+ oThG = new ThreadGroup(sPrcName);
+ m_omChildPrc.put(sPrcName, new GroupOfChilds(oThG));
+ }
+ GroupOfChilds oCnt = m_omChildPrc.get(sPrcName);
+
+ if (null == oCnt) return;
+ if (p_bFirst)
+ oCnt.m_bError = false;
+
+ try
+ {
+ oCnt.execute(p_oP);
+ }
+ catch (Exception e)
+ {
+ oCnt.m_bError = true;
+ m_oLogger.error("GroupOfChilds.execute", e);
+ }
+ } //_________________________________________
+
+ private class GroupOfChilds implements Observer
+ {
+ private ThreadGroup m_oThrGrp;
+ private boolean m_bError = false;
+
+ private File m_oInpDir;
+ private FileFilter m_oFFilt;
+ private Class m_oExecClass;
+ private DomElement m_oChParms;
+ private int m_iQthr = 0, m_iMaxThr;
+ private StringBuilder m_sb;
+ private int m_iSbIni;
+
+ private GroupOfChilds(ThreadGroup p_oThrGrp) throws Exception
+ {
+ m_oThrGrp = p_oThrGrp;
+ m_sb = new StringBuilder("GroupOfThreads ")
+ .append(m_oThrGrp.getName()).append(" : ");
+ m_iSbIni = m_sb.length();
+ } //________________________________
+
+ public void update(Observable p_oObs, Object p_oUsrObj)
+ {
+ if (p_oUsrObj instanceof Integer)
+ {
+ updQthreads( ( (Integer) p_oUsrObj).intValue());
+ }
+ } //________________________________
+
+ private synchronized void updQthreads(int p_i)
+ {
+ m_iQthr += p_i;
+ } //________________________________
+
+ private void execute(DomElement p_oP) throws Exception
+ {
+ m_sb.setLength(m_iSbIni);
+ if (m_bError)
+ {
+ m_oLogger.warn(m_sb.append(" Skipping execution due to previous errors")
+ .toString());
+ return;
+ }
+ checkParms(p_oP);
+
+ File[] oaF = m_oInpDir.listFiles(m_oFFilt);
+
+ for (File oFcurr : oaF)
+ {
+ if (m_iQthr >= m_iMaxThr)
+ {
+ m_oLogger.info(m_sb.append("Waiting for available threads").toString());
+ Thread.sleep(5000);
+ break;
+ }
+ m_oChParms.rmvChildsByName(EsbAbstractProcessor.PARMS_THIS_INSTANCE);
+ DomElement oThisProc = new DomElement(EsbAbstractProcessor.PARMS_THIS_INSTANCE);
+ oThisProc.setAttr(ParamsFileProcessor.FPRC_FILENAME, oFcurr.getName());
+ m_oChParms.addElemChild(oThisProc);
+
+ new Thread(m_oThrGrp,
+ new ChildProcess(m_oExecClass, this, m_oChParms)).start();
+ Thread.sleep(500);
+ }
+ } //________________________________
+
+ protected void checkParms(DomElement p_oP) throws Exception
+ {
+ String sAtt = p_oP.getAttr(PARM_MAX_THREADS);
+ m_iMaxThr = (null == sAtt) ? 1 : Integer.parseInt(sAtt);
+ if (m_iMaxThr > 10)
+ {
+ m_iMaxThr = 10;
+ }
+ String sUid = p_oP.getName();
+ m_oChParms = p_oP.cloneObj();
+ sAtt = m_oChParms.getAttr(PARM_INPUT_DIR);
+ if (null == sAtt)
+ {
+ throw new Exception(m_sb.append("Missing ").append(PARM_INPUT_DIR)
+ .append(" attribute in -parameters ")
+ .append(sUid).toString());
+ }
+ m_oInpDir = new File(new URI(sAtt));
+ if (!m_oInpDir.isDirectory())
+ {
+ throw new Exception(m_sb.append(sAtt).append(" is not a directory").
+ toString());
+ }
+ if (!m_oInpDir.canRead())
+ {
+ throw new Exception(m_sb.append("Can't read directory ").append(sAtt).
+ toString());
+ }
+
+ sAtt = m_oChParms.getAttr(PARM_SUFFIX);
+ if (null == sAtt)
+ {
+ throw new Exception(m_sb.append("Missing ").append(PARM_SUFFIX)
+ .append(" attribute in -parameters ")
+ .append(sUid).toString());
+ }
+
+ m_oFFilt = new FileEndsWith(sAtt);
+
+ do
+ {
+ sAtt = p_oP.getAttr(PARM_FILE_PROCESSOR_CLASS);
+ m_oExecClass = null;
+ if (null == sAtt)
+ {
+ throw new Exception(m_sb.append("Missing actionClass attribute").
+ toString());
+ }
+ try
+ {
+ m_oExecClass = Class.forName(sAtt);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new Exception(m_sb.append("Class ").append(sAtt)
+ .append(" not found in classpath").toString());
+ }
+ try
+ {
+ m_oExecClass.getConstructor(new Class[] {DomElement.class});
+ }
+ catch (NoSuchMethodException eN)
+ {
+ throw new Exception(m_sb.append("No appropriate constructor")
+ .append(" (DomElement) found for class").toString());
+ }
+ }
+ while (false);
+ } //________________________________
+
+ private class FileEndsWith implements FileFilter
+ {
+ String m_sSuffix;
+ FileEndsWith(String p_sEnd) throws Exception
+ {
+ m_sSuffix = p_sEnd;
+ if (null == m_sSuffix)
+ {
+ throw new Exception("Must specify file extension");
+ }
+ } //_________________________________________
+
+ public boolean accept(File p_f)
+ {
+ if (!p_f.isFile())
+ {
+ return false;
+ }
+ return p_f.toString().endsWith(m_sSuffix);
+ } //_________________________________________
+ } //___________________________________________________
+ } //______________________________________________________
+
+ private static class ChildProcess extends Observable implements Runnable
+ { private Class m_oExecClass;
+ private DomElement m_oParms;
+ private Logger m_oLogger;
+ public ChildProcess(Class p_oExec, Observer p_oObs, DomElement p_oP)
+ { m_oLogger = EsbUtil.getDefaultLogger(this.getClass());
+ m_oExecClass = p_oExec;
+ this.addObserver(p_oObs);
+ m_oParms = p_oP;
+ setChanged();
+ notifyObservers(new Integer(1));
+ } //__________________________________
+
+ public void run()
+ { try
+ { Constructor oCnst = m_oExecClass.getConstructor(new Class[] {DomElement.class});
+ Object oProc = oCnst.newInstance(new Object[] {m_oParms});
+ ((EsbFileProcessor)oProc).execute();
+ }
+ catch (Exception e) { m_oLogger.error("run() FAILED",e); }
+
+ setChanged();
+ notifyObservers(new Integer(-1));
+ } //__________________________________
+ } //____________________________________________________________________________
+} //____________________________________________________________________________
Deleted: labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OneChildProcess.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OneChildProcess.java 2006-07-03 16:58:38 UTC (rev 4902)
+++ labs/jbossesb/trunk/ESBCore/EsbListeners/src/org/jboss/soa/esb/listeners/OneChildProcess.java 2006-07-03 20:15:11 UTC (rev 4903)
@@ -1,59 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-
-
-package org.jboss.soa.esb.listeners;
-
-import java.util.*;
-import java.lang.reflect.*;
-import org.apache.log4j.*;
-
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.processors.*;
-import org.jboss.soa.esb.util.EsbUtil;
-
-
-public class OneChildProcess extends Observable implements Runnable
-{ private Class m_oExecClass;
- private DomElement m_oParms;
- private Logger m_oLogger;
- public OneChildProcess(Class p_oExec, Observer p_oObs, DomElement p_oP)
- { m_oLogger = EsbUtil.getDefaultLogger(this.getClass());
- m_oExecClass = p_oExec;
- this.addObserver(p_oObs);
- m_oParms = p_oP;
- setChanged();
- notifyObservers(new Integer(1));
- } //__________________________________
-
- public void run()
- { try
- { Constructor oCnst = m_oExecClass.getConstructor(new Class[] {DomElement.class});
- Object oProc = oCnst.newInstance(new Object[] {m_oParms});
- ((EsbFileProcessor)oProc).execute();
- }
- catch (Exception e) { m_oLogger.error("run() FAILED",e); }
-
- setChanged();
- notifyObservers(new Integer(-1));
- } //__________________________________
-} //____________________________________________________________________________
|