From: <jbo...@li...> - 2006-07-05 22:31:58
|
Author: estebanschifman Date: 2006-07-05 18:31:53 -0400 (Wed, 05 Jul 2006) New Revision: 4923 Modified: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java Log: Change some constants, and add documentation Modified: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java =================================================================== --- labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-07-05 20:54:37 UTC (rev 4922) +++ labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-07-05 22:31:53 UTC (rev 4923) @@ -51,8 +51,8 @@ public static final String PARM_ACTION_CLASS = "actionClass"; - public static final String PARM_POLL_LTCY = "pollLatency"; - public static final String PARM_RELOAD_LTCY = "reloadLatency"; + public static final String PARM_POLL_LTCY = "pollLatencySecs"; + public static final String PARM_RELOAD_LTCY = "parmsReloadSecs"; public static final String PARM_TOPIC_CONN_FACT = "topicConnFactoryClass"; public static final String PARM_QUIESCE_TOPIC = "quiesceTopic"; Modified: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java =================================================================== --- labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-07-05 20:54:37 UTC (rev 4922) +++ labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-07-05 22:31:53 UTC (rev 4923) @@ -24,19 +24,102 @@ package org.jboss.soa.esb.listeners; import java.util.*; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.lang.reflect.*; import java.sql.*; import javax.sql.*; +import org.jboss.soa.esb.services.InotificationHandler; +import org.jboss.soa.esb.services.NotificationHandlerFactory; import org.jboss.soa.esb.util.*; +import org.jboss.soa.esb.common.EsbSysProps; import org.jboss.soa.esb.helpers.*; import org.jboss.soa.esb.helpers.persist.*; +import org.jboss.soa.esb.notification.NotificationList; import org.jboss.soa.esb.processors.*; - +/** + * SqlTablePoller class + * + * the "main(args)" static method of this class will: + * 1) load the parameters from the Name supplied in args[0] into a DomElement + * (See org.jboss.soa.esb.parameters package for options on parameter repositories) + * 2) for each child element (1st level) of the DomElement, it will try to initiate + * a Thread group that can start a maximum of simultaneous child threads + * "maxThreads" supplied in parameters - default=1 + * 3) each thread group will poll a SQL table with parameters defined in the corresponding + * DomElement (example below has only 1 child element, but you can have several) + * 4) Execution will orderly finish when a message is received in the "quiesceTopic" + * with an optional "quiesceSelector" (please see AbstractPoller class constants) + * + * The SQL table(s) that is (are) polled should have + * 1) a unique key (see "keyFields" parameter) that will be used to update status + * 2) a column to indicate the "processing status" of this trigger row (see ROW_STATE enum) + * this column will be updated by the SqlChildProcess.run() method (see internal + * protected class SqlChildProcess) + * + * Each retrieved row (see OPTIONAL_ATT.whereCondition) should be considered as a trigger + * that is intended to instantiate an object of "actionClass". The new instance will + * receive the full DomElement (level 1 for each child group), with an added child + * DomElement (see EsbAbstractProcessor.PARMS_THIS_INSTANCE) containing attributes + * corresponding to the values of the "selectFields" columns in the row that triggered + * the "actionClass" + * + * The ZZDummyProcessor (in this package) is a trivial actionClass included with the + * sole purpose of illustrating what the actionClass will receive, and the entry points + * where users can insert their logic + * + * GOOD LUCK !! + * + * @author Esteban Schifman + */ public class SqlTablePoller extends AbstractPoller { +/* ___________________ Example XML configuration file for a SqlTablePoller_______________ + * +<DocumentElementName> + <ExampleListenChapter + pollLatencySecs="20" + parmsReloadSecs="300" + + maxThreads="2" + + actionClass="org.jboss.soa.esb.listeners.ZZDummyProcessor" + + driver-class="org.postgresql.Driver" + connection-url="jdbc:postgresql://myhost:5432/myDB" + user-name="postgres" + password="" + + tableName="test_notif_table" + selectFields="oid,ref,msg" + keyFields="oid,ref" + inProcessField="statusCol" + whereCondition="src='pepe'" + orderBy="oid desc" + > + <NotificationList type="OK"> + <target class="NotifyFiles"> + <file URI="file:///tmp/jbossEsb/notifyDir/ListenOnNotifTable.notifOK" + append="true" + /> + </target> + </NotificationList> + + <NotificationList type="err"> + <target class="NotifyFiles"> + <file URI="file:///tmp/jbossEsb/notifyDir/ListenOnNotifTable.notifErr" + append="true" + /> + </target> + </NotificationList> + </ExampleListenChapter> +</DocumentElementName> + * + */ + /** - * These are the mandatory attributes needed for any SqlTablePoller + * Mandatory attributes needed for SqlTablePoller * 1) Table name * 2) Comma separated list of fields needed in the ResultSet * 3) Comma separated list of fields that constitute a unique ID of the working row @@ -352,6 +435,7 @@ m_oParent = p_oGrp; this.addObserver(m_oParent); setChanged(); + // add 1 to child thread count notifyObservers(new Integer(1)); m_oInstP = p_oP; @@ -360,7 +444,8 @@ } //__________________________________ public void run() - { + { + Exception oAbend = null; try { m_PSsel4U = m_oConn.prepareStatement(m_oParent.selectForUpdStatement()); m_PSupd = m_oConn.prepareStatement(m_oParent.updateStatement()); @@ -373,14 +458,21 @@ // autoincrement leaves things ready for next SQL parameter m_PSupd.setString (++iParm,sVal); } + // will only continue if it can change status to "Working" if (! changeStatus(ROW_STATE.Pending,ROW_STATE.Working)) - return; + oAbend = new Exception("Unable to change status to Working"); } catch (Exception e) { m_oLogger.error("Problems with update statements",e); - return; + if (null!=m_oConn) + { try { m_oConn.rollback(); } + catch (Exception eR) { /* OK do nothing */} + m_oConn.release(); + } + oAbend = e; } + if (null==oAbend) try { Constructor oCnst = m_oParent.m_oExecClass @@ -389,17 +481,30 @@ oParms.addElemChild(m_oInstP); Object oInst = oCnst.newInstance (new Object[] {oParms}); ((EsbAbstractProcessor)oInst).execute(); + changeStatus(ROW_STATE.Working,ROW_STATE.Done); } catch (Exception e) - { m_oLogger.error("run() FAILED",e); + { m_oLogger.error("run() FAILED",e); + try + { m_oConn.rollback(); + changeStatus(null,ROW_STATE.Error); + } + catch (Exception CS) { /* What could we do here ? */} + oAbend = e; } finally { if (null!=m_oConn) m_oConn.release(); } - + + if (null==oAbend) + notifyOK(); + else + notifyError(oAbend); + setChanged(); + // decrease child thread count in parent group notifyObservers(new Integer(-1)); } //______________________________ @@ -420,7 +525,64 @@ return true; } //______________________________ - + + public void notifyOK() + { try + { + String sNotif = getOkNotifContent(); + for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT)) + { NotificationList oNL = new NotificationList(oCurr); + if (! oNL.isOK()) continue; + getNotifHandler().sendNotifications(oCurr,sNotif); + } + } + catch (Exception e) {} + } //__________________________________ + + public void notifyError(Exception p_e) + { + String sNotif = getErrorNotifContent(); + ByteArrayOutputStream oBO = new ByteArrayOutputStream(); + PrintStream oPS = new PrintStream(oBO); + try + { oPS.println(sNotif); + if (null != p_e) p_e.printStackTrace(oPS); + oPS.close(); + + String sMsg = oBO.toString(); + for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT)) + { NotificationList oNL = new NotificationList(oCurr); + if (! oNL.isErr()) continue; + getNotifHandler().sendNotifications(oNL,sMsg); + } + } + catch (Exception e) { } + } //__________________________________ + + protected InotificationHandler getNotifHandler() + { + try { return NotificationHandlerFactory.getNotifHandler + ("remote" + ,EsbSysProps.getJndiServerType() + ,EsbSysProps.getJndiServerURL() + ); + } + catch (Exception e) + { m_oLogger.error(formatLogMsg("Notification FAILED"),e); + return null; + } + } //______________________________ + + // These methods to be overriden by you own derived class + protected String getOkNotifContent() + { + return "Success"; + } + protected String getErrorNotifContent() + { + return "FAILURE"; + } + } //______________________________________________________ } //____________________________________________________________________________ |