From: <jbo...@li...> - 2006-07-05 10:43:47
|
Author: estebanschifman Date: 2006-07-05 06:43:42 -0400 (Wed, 05 Jul 2006) New Revision: 4910 Added: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java Log: Class that can be configured to poll an SQL table for specific content, and trigger an action Class if conditions are met Added: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java =================================================================== --- labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-07-05 10:40:41 UTC (rev 4909) +++ labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-07-05 10:43:42 UTC (rev 4910) @@ -0,0 +1,426 @@ +/* +* JBoss, Home of Professional Open Source +* Copyright 2006, JBoss Inc., and individual contributors as indicated +* by the @authors tag. See the copyright.txt in the distribution for a +* full listing of individual contributors. +* +* This is free software; you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as +* published by the Free Software Foundation; either version 2.1 of +* the License, or (at your option) any later version. +* +* This software is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this software; if not, write to the Free +* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +* 02110-1301 USA, or see the FSF site: http://www.fsf.org. +*/ + + +package org.jboss.soa.esb.listeners; + +import java.util.*; +import java.lang.reflect.*; +import java.sql.*; +import javax.sql.*; + +import org.jboss.soa.esb.util.*; +import org.jboss.soa.esb.helpers.*; +import org.jboss.soa.esb.helpers.persist.*; +import org.jboss.soa.esb.processors.*; + +public class SqlTablePoller extends AbstractPoller +{ + /** + * These are the mandatory attributes needed for any SqlTablePoller + * 1) Table name + * 2) Comma separated list of fields needed in the ResultSet + * 3) Comma separated list of fields that constitute a unique ID of the working row + * all fields in this list MUST also be in the "selectFields" list + * these fields will be used in the "where" clause of update statements + * 4) Name of table field used as indicator/semaphore to avoid concurrent update + * + */ + public static enum TABLE_ATT + { + tableName + ,selectFields + ,keyFields + ,inProcessField + }; + + /** + * Optional fields that can be included in your parameter tree as attributes in the + * upper Element + * 1) 4 character long String that indicate status of each row for this poller + * 1st: Character that indicates "Pending" state = available for processing + * 2nd: "Working" : some poller is working on the row (or ab-ended while working) + * 3rd: "Error" : some poller tried to process, and found an error during processing + * 4th: "Done" : this row has already been processed successfully + * 2) if you wish to further filter your ResultSet, you can add an optional list of + * conditions that will be included in the "scan" SQL statement (without "where") + * 3) Comma separated list of fields to order ResultSet (without "order by") + * + */ + public static enum OPTIONAL_ATT + { + inProcessVals + ,whereCondition + ,orderBy + }; + + /** + * First character of these values are the default states of a table row trigger + * the "inProcessVals" parameter can override these (if that were ever necessary) + * this is why the default value for that parameter is "PWED" (see below) + * The poller will only process rows that have the "inProcessField" first character + * equal to the first character of whatever the "Pending" state is (typically "P") + * + */ + public static enum ROW_STATE + {Pending + ,Working + ,Error + ,Done + }; + public static final String DEFAULT_STATES = "PWED"; + + protected Map<String,String> m_oVals = new HashMap<String,String>(); + + /** + * Main program for SqlTablePoller + * @param args - String[] First parameter must be path to configuration XML file + * @throws Exception + */ + public static void main(String[] args) throws Exception + { + new SqlTablePoller(args[0]); + } //________________________________ + + + public SqlTablePoller(String p_sParamsUid) throws Exception + { + super(p_sParamsUid); + m_iDfltReloadMillis = 180000; + m_iDfltPollMillis = 20000; + m_iMinPollMillis = 5000; +// See superclass - It provides ability to request end by subscribing to a Topic + runUntilEndRequested(); + } //__________________________________ + + @Override + protected GroupOfChilds newChildGroup(ThreadGroup pThG) throws Exception + { + return new SqlPollerChildGroup(pThG); + } //__________________________________ + + private class SqlPollerChildGroup extends AbstractPoller.GroupOfChilds + { + JdbcCleanConn m_oConn; + String[] m_saKeys; + + private SqlPollerChildGroup(ThreadGroup p_oThrGrp) throws Exception + { super(p_oThrGrp); + } //________________________________ + + protected void checkAtt(DomElement p_oP,String p_sAtt, String p_sDefault) + throws Exception + { m_oVals.put(p_sAtt,obtainAtt(p_oP,p_sAtt,p_sDefault)); + } //________________________________ + + protected void checkParms(DomElement p_oP) throws Exception + { + super.checkParms(p_oP); + + checkAtt(p_oP,SimpleDataSource.DRIVER ,null); + checkAtt(p_oP,SimpleDataSource.URL ,null); + checkAtt(p_oP,SimpleDataSource.USER ,""); + checkAtt(p_oP,SimpleDataSource.PASSWORD ,""); + + for (TABLE_ATT oCurr : TABLE_ATT.values()) + checkAtt(p_oP,oCurr.toString(),null); + + checkAtt(p_oP,OPTIONAL_ATT.whereCondition.toString(),""); + checkAtt(p_oP,OPTIONAL_ATT.orderBy.toString(),""); + + String sAtt = OPTIONAL_ATT.inProcessVals.toString(); + checkAtt(p_oP,sAtt,DEFAULT_STATES); + String sStates = m_oVals.get(sAtt); + if (sStates.length()<4) + throw new Exception(formatLogMsg + ("Parameter <"+sAtt+"> must be at least 4 characters long (PWED)" + )); + + checkAtt(p_oP,PARM_ACTION_CLASS,null); + m_oExecClass = super.checkActionClass(m_oVals.get(PARM_ACTION_CLASS)); + + StringTokenizer ST = new StringTokenizer + (m_oVals.get(TABLE_ATT.selectFields.toString()),","); + Set<String> oSelFlds = new HashSet<String>(); + while (ST.hasMoreElements()) + oSelFlds.add(ST.nextToken().trim()); + + ST = new StringTokenizer + (m_oVals.get(TABLE_ATT.keyFields.toString()),","); + m_saKeys = new String[ST.countTokens()]; + if (m_saKeys.length < 1) + throw new Exception(formatLogMsg("Empty list of keyFields")); + + for (int iCurr = 0; ST.hasMoreTokens(); iCurr++) + { String sKey = ST.nextToken().trim(); + if (! oSelFlds.contains(sKey)) + throw new Exception(formatLogMsg("Key field <"+ sKey + "> must also be in select list")); + m_saKeys[iCurr] = sKey; + } + + } //________________________________ + + @Override + protected void doYourJob(DomElement p_oP) throws Exception + { + for (DomElement oCurr : getTriggers()) + { + if (m_iQthr >= m_iMaxThr) + { + m_oLogger.info(m_sb.append("Waiting for available threads").toString()); + Thread.sleep(5000); + break; + } + + new Thread(m_oThrGrp, + new SqlChildProcess(this, oCurr)).start(); + // Wait a little bit, so thread count will be updated + // at some point in the past, this sleep was indispensable + // new thread control classes in Java 5 should be explored + Thread.sleep(500); + } + + } //________________________________ + + protected List<DomElement> getTriggers() throws Exception + { + JdbcCleanConn oConn = newDbConn(); + String sScan = scanStatement(); + + PreparedStatement PS = oConn.prepareStatement(sScan); + ResultSet RS = oConn.execQueryWait(PS,1); + + ResultSetMetaData oMeta = RS.getMetaData(); + String[] saColName = new String[oMeta.getColumnCount()]; + for (int i1=0; i1<saColName.length; i1++) + saColName[i1] = oMeta.getColumnName(1+i1); + + List<DomElement> oResults = new ArrayList<DomElement>(); + while (RS.next()) + { DomElement oNew = new DomElement(EsbAbstractProcessor.PARMS_THIS_INSTANCE); + for (String sCurrCol : saColName) + oNew.setAttr(sCurrCol,RS.getString(sCurrCol)); + oResults.add(oNew); + } + oConn.release(); + + return oResults; + } //________________________________ + + /** + * Obtain a new database connection with parameter info + * @return A new connection + * @throws Exception - if problems are encountered + */ + protected JdbcCleanConn newDbConn() throws Exception + { DataSource oDS = new SimpleDataSource + (m_oVals.get(SimpleDataSource.DRIVER) + ,m_oVals.get(SimpleDataSource.URL) + ,m_oVals.get(SimpleDataSource.USER) + ,m_oVals.get(SimpleDataSource.PASSWORD) + ); + return new JdbcCleanConn(oDS); + } //________________________________ + + /** + * Assemble the SQL statement to scan (poll) the table + * @return - The resulting SQL statement + */ + protected String scanStatement() + { + StringBuilder sb = new StringBuilder () + .append("select ").append(m_oVals.get(TABLE_ATT.selectFields.toString())) + .append(" from ") .append(m_oVals.get(TABLE_ATT.tableName.toString())); + + String sAux = m_oVals.get(OPTIONAL_ATT.whereCondition.toString()); + boolean bWhere = ! EsbUtil.isNullString(sAux); + if (bWhere) + sb.append(" where ").append(sAux); + sb.append((bWhere) ? " and " : " where "); + + String sLike = m_oVals.get(OPTIONAL_ATT.inProcessVals.toString()) + .substring(0,1).toUpperCase(); + sb.append(" upper(").append(m_oVals.get(TABLE_ATT.inProcessField.toString())) + .append(") like '").append(sLike).append("%'"); + + + sAux = m_oVals.get(OPTIONAL_ATT.orderBy); + if (! EsbUtil.isNullString(sAux)) + sb.append(" order by ").append(sAux); + return sb.toString(); + } //________________________________ + + /** + * Assemble the SQL statement to update the field + * in the "inProcessField" parameter + * + * in the table row uniquely identified by the list of fields + * in the "keyFields" parameter + * + * @return - The resulting SQL statement + */ + protected String updateStatement() + { + StringBuilder sb = new StringBuilder () + .append("update ").append(m_oVals.get(TABLE_ATT.tableName.toString())) + .append(" set ") .append(m_oVals.get(TABLE_ATT.inProcessField.toString())) + .append(" = ? where ") + ; + int iCurr = 0; + for(String sCurr : m_saKeys) + { if (iCurr++ > 0) + sb.append(" and "); + sb.append(sCurr).append(" = ?"); + } + return sb.toString(); + } //________________________________ + + /** + * Assemble the SQL "select for update" statement + * for the "inProcessField" parameter + * + * in the table row uniquely identified by the list of fields + * in the "keyFields" parameter + * + * @return - The resulting SQL statement + */ + protected String selectForUpdStatement() + { + StringBuilder sb = new StringBuilder () + .append("select ").append(m_oVals.get(TABLE_ATT.inProcessField.toString())) + .append(" from ") .append(m_oVals.get(TABLE_ATT.tableName.toString())) + .append(" where ") + ; + int iCurr = 0; + for(String sCurr : m_saKeys) + { if (iCurr++ > 0) + sb.append(" and "); + sb.append(sCurr).append(" = ?"); + } + return sb.append(" for update").toString(); + } //________________________________ + + } //______________________________________________________ + + /** + * The child process group will try to start a new thread for each + * element in the ResultSet returned by the scanStatement() + * The controlling SqlPollerChildGroup will not start a thread if + * "maxThreads" is exceeded + * The idea is to obtain a new instance of the "actionClass" for each + * row of the resultSet + * Contents of the table field indicated by the "inProcessField" parameter are used + * to control concurrency (see enum ROW_STATE for details) + * + * @author Esteban + * + */ + protected class SqlChildProcess extends Observable implements Runnable + { + protected SqlPollerChildGroup m_oParent; // you can always go there for common stuff + protected DomElement m_oInstP; // values for this instance + protected JdbcCleanConn m_oConn; // each child has it's own DB connection + protected PreparedStatement m_PSsel4U ,m_PSupd; + protected String m_sUpdStates; + protected String getStatus(ROW_STATE p_oState) + { int iPos = p_oState.ordinal(); + return m_sUpdStates.substring(iPos,++iPos); + } //________________________________ + + public SqlChildProcess(SqlPollerChildGroup p_oGrp, DomElement p_oP) + throws Exception + { + m_oParent = p_oGrp; + this.addObserver(m_oParent); + setChanged(); + notifyObservers(new Integer(1)); + + m_oInstP = p_oP; + m_oConn = p_oGrp.newDbConn(); + m_sUpdStates = m_oVals.get(OPTIONAL_ATT.inProcessVals.toString()); + } //__________________________________ + + public void run() + { + try + { m_PSsel4U = m_oConn.prepareStatement(m_oParent.selectForUpdStatement()); + m_PSupd = m_oConn.prepareStatement(m_oParent.updateStatement()); + + int iParm=1; + for (String sCurr : m_oParent.m_saKeys) + { String sVal = m_oInstP.getAttr(sCurr); + m_PSsel4U.setString (iParm ,sVal); + // parameters are +1 in update statement + // autoincrement leaves things ready for next SQL parameter + m_PSupd.setString (++iParm,sVal); + } + if (! changeStatus(ROW_STATE.Pending,ROW_STATE.Working)) + return; + } + catch (Exception e) + { m_oLogger.error("Problems with update statements",e); + return; + } + + try + { + Constructor oCnst = m_oParent.m_oExecClass + .getConstructor (new Class[] {DomElement.class}); + DomElement oParms = m_oParent.m_oChParms.cloneObj(); + oParms.addElemChild(m_oInstP); + Object oInst = oCnst.newInstance (new Object[] {oParms}); + ((EsbAbstractProcessor)oInst).execute(); + } + catch (Exception e) + { m_oLogger.error("run() FAILED",e); + } + + finally + { if (null!=m_oConn) + m_oConn.release(); + } + + setChanged(); + notifyObservers(new Integer(-1)); + } //______________________________ + + private boolean changeStatus (ROW_STATE pFrom, ROW_STATE pTo) throws Exception + { ResultSet RS = m_oConn.execQueryWait(m_PSsel4U,5); + if (! RS.next()) + return false; + if (null!=pFrom) + { String sOldStatus = RS.getString(1).substring(0,1); + if (!sOldStatus.equalsIgnoreCase(getStatus(pFrom))) + { m_oConn.rollback(); + return false; + } + } + m_PSupd.setString(1,getStatus(pTo)); + m_oConn.execUpdWait(m_PSupd,5); + m_oConn.commit(); + + return true; + } //______________________________ + + } //______________________________________________________ + +} //____________________________________________________________________________ |