|
From: <jbo...@li...> - 2006-07-05 22:31:58
|
Author: estebanschifman
Date: 2006-07-05 18:31:53 -0400 (Wed, 05 Jul 2006)
New Revision: 4923
Modified:
labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
Log:
Change some constants, and add documentation
Modified: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-07-05 20:54:37 UTC (rev 4922)
+++ labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-07-05 22:31:53 UTC (rev 4923)
@@ -51,8 +51,8 @@
public static final String PARM_ACTION_CLASS = "actionClass";
- public static final String PARM_POLL_LTCY = "pollLatency";
- public static final String PARM_RELOAD_LTCY = "reloadLatency";
+ public static final String PARM_POLL_LTCY = "pollLatencySecs";
+ public static final String PARM_RELOAD_LTCY = "parmsReloadSecs";
public static final String PARM_TOPIC_CONN_FACT = "topicConnFactoryClass";
public static final String PARM_QUIESCE_TOPIC = "quiesceTopic";
Modified: labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-07-05 20:54:37 UTC (rev 4922)
+++ labs/jbossesb/trunk/ESBCore/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-07-05 22:31:53 UTC (rev 4923)
@@ -24,19 +24,102 @@
package org.jboss.soa.esb.listeners;
import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
import java.lang.reflect.*;
import java.sql.*;
import javax.sql.*;
+import org.jboss.soa.esb.services.InotificationHandler;
+import org.jboss.soa.esb.services.NotificationHandlerFactory;
import org.jboss.soa.esb.util.*;
+import org.jboss.soa.esb.common.EsbSysProps;
import org.jboss.soa.esb.helpers.*;
import org.jboss.soa.esb.helpers.persist.*;
+import org.jboss.soa.esb.notification.NotificationList;
import org.jboss.soa.esb.processors.*;
-
+/**
+ * SqlTablePoller class
+ *
+ * the "main(args)" static method of this class will:
+ * 1) load the parameters from the Name supplied in args[0] into a DomElement
+ * (See org.jboss.soa.esb.parameters package for options on parameter repositories)
+ * 2) for each child element (1st level) of the DomElement, it will try to initiate
+ * a Thread group that can start a maximum of simultaneous child threads
+ * "maxThreads" supplied in parameters - default=1
+ * 3) each thread group will poll a SQL table with parameters defined in the corresponding
+ * DomElement (example below has only 1 child element, but you can have several)
+ * 4) Execution will orderly finish when a message is received in the "quiesceTopic"
+ * with an optional "quiesceSelector" (please see AbstractPoller class constants)
+ *
+ * The SQL table(s) that is (are) polled should have
+ * 1) a unique key (see "keyFields" parameter) that will be used to update status
+ * 2) a column to indicate the "processing status" of this trigger row (see ROW_STATE enum)
+ * this column will be updated by the SqlChildProcess.run() method (see internal
+ * protected class SqlChildProcess)
+ *
+ * Each retrieved row (see OPTIONAL_ATT.whereCondition) should be considered as a trigger
+ * that is intended to instantiate an object of "actionClass". The new instance will
+ * receive the full DomElement (level 1 for each child group), with an added child
+ * DomElement (see EsbAbstractProcessor.PARMS_THIS_INSTANCE) containing attributes
+ * corresponding to the values of the "selectFields" columns in the row that triggered
+ * the "actionClass"
+ *
+ * The ZZDummyProcessor (in this package) is a trivial actionClass included with the
+ * sole purpose of illustrating what the actionClass will receive, and the entry points
+ * where users can insert their logic
+ *
+ * GOOD LUCK !!
+ *
+ * @author Esteban Schifman
+ */
public class SqlTablePoller extends AbstractPoller
{
+/* ___________________ Example XML configuration file for a SqlTablePoller_______________
+ *
+<DocumentElementName>
+ <ExampleListenChapter
+ pollLatencySecs="20"
+ parmsReloadSecs="300"
+
+ maxThreads="2"
+
+ actionClass="org.jboss.soa.esb.listeners.ZZDummyProcessor"
+
+ driver-class="org.postgresql.Driver"
+ connection-url="jdbc:postgresql://myhost:5432/myDB"
+ user-name="postgres"
+ password=""
+
+ tableName="test_notif_table"
+ selectFields="oid,ref,msg"
+ keyFields="oid,ref"
+ inProcessField="statusCol"
+ whereCondition="src='pepe'"
+ orderBy="oid desc"
+ >
+ <NotificationList type="OK">
+ <target class="NotifyFiles">
+ <file URI="file:///tmp/jbossEsb/notifyDir/ListenOnNotifTable.notifOK"
+ append="true"
+ />
+ </target>
+ </NotificationList>
+
+ <NotificationList type="err">
+ <target class="NotifyFiles">
+ <file URI="file:///tmp/jbossEsb/notifyDir/ListenOnNotifTable.notifErr"
+ append="true"
+ />
+ </target>
+ </NotificationList>
+ </ExampleListenChapter>
+</DocumentElementName>
+ *
+ */
+
/**
- * These are the mandatory attributes needed for any SqlTablePoller
+ * Mandatory attributes needed for SqlTablePoller
* 1) Table name
* 2) Comma separated list of fields needed in the ResultSet
* 3) Comma separated list of fields that constitute a unique ID of the working row
@@ -352,6 +435,7 @@
m_oParent = p_oGrp;
this.addObserver(m_oParent);
setChanged();
+ // add 1 to child thread count
notifyObservers(new Integer(1));
m_oInstP = p_oP;
@@ -360,7 +444,8 @@
} //__________________________________
public void run()
- {
+ {
+ Exception oAbend = null;
try
{ m_PSsel4U = m_oConn.prepareStatement(m_oParent.selectForUpdStatement());
m_PSupd = m_oConn.prepareStatement(m_oParent.updateStatement());
@@ -373,14 +458,21 @@
// autoincrement leaves things ready for next SQL parameter
m_PSupd.setString (++iParm,sVal);
}
+ // will only continue if it can change status to "Working"
if (! changeStatus(ROW_STATE.Pending,ROW_STATE.Working))
- return;
+ oAbend = new Exception("Unable to change status to Working");
}
catch (Exception e)
{ m_oLogger.error("Problems with update statements",e);
- return;
+ if (null!=m_oConn)
+ { try { m_oConn.rollback(); }
+ catch (Exception eR) { /* OK do nothing */}
+ m_oConn.release();
+ }
+ oAbend = e;
}
+ if (null==oAbend)
try
{
Constructor oCnst = m_oParent.m_oExecClass
@@ -389,17 +481,30 @@
oParms.addElemChild(m_oInstP);
Object oInst = oCnst.newInstance (new Object[] {oParms});
((EsbAbstractProcessor)oInst).execute();
+ changeStatus(ROW_STATE.Working,ROW_STATE.Done);
}
catch (Exception e)
- { m_oLogger.error("run() FAILED",e);
+ { m_oLogger.error("run() FAILED",e);
+ try
+ { m_oConn.rollback();
+ changeStatus(null,ROW_STATE.Error);
+ }
+ catch (Exception CS) { /* What could we do here ? */}
+ oAbend = e;
}
finally
{ if (null!=m_oConn)
m_oConn.release();
}
-
+
+ if (null==oAbend)
+ notifyOK();
+ else
+ notifyError(oAbend);
+
setChanged();
+ // decrease child thread count in parent group
notifyObservers(new Integer(-1));
} //______________________________
@@ -420,7 +525,64 @@
return true;
} //______________________________
-
+
+ public void notifyOK()
+ { try
+ {
+ String sNotif = getOkNotifContent();
+ for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
+ { NotificationList oNL = new NotificationList(oCurr);
+ if (! oNL.isOK()) continue;
+ getNotifHandler().sendNotifications(oCurr,sNotif);
+ }
+ }
+ catch (Exception e) {}
+ } //__________________________________
+
+ public void notifyError(Exception p_e)
+ {
+ String sNotif = getErrorNotifContent();
+ ByteArrayOutputStream oBO = new ByteArrayOutputStream();
+ PrintStream oPS = new PrintStream(oBO);
+ try
+ { oPS.println(sNotif);
+ if (null != p_e) p_e.printStackTrace(oPS);
+ oPS.close();
+
+ String sMsg = oBO.toString();
+ for (DomElement oCurr : m_oParms.getElementChildren(NotificationList.ELEMENT))
+ { NotificationList oNL = new NotificationList(oCurr);
+ if (! oNL.isErr()) continue;
+ getNotifHandler().sendNotifications(oNL,sMsg);
+ }
+ }
+ catch (Exception e) { }
+ } //__________________________________
+
+ protected InotificationHandler getNotifHandler()
+ {
+ try { return NotificationHandlerFactory.getNotifHandler
+ ("remote"
+ ,EsbSysProps.getJndiServerType()
+ ,EsbSysProps.getJndiServerURL()
+ );
+ }
+ catch (Exception e)
+ { m_oLogger.error(formatLogMsg("Notification FAILED"),e);
+ return null;
+ }
+ } //______________________________
+
+ // These methods to be overriden by you own derived class
+ protected String getOkNotifContent()
+ {
+ return "Success";
+ }
+ protected String getErrorNotifContent()
+ {
+ return "FAILURE";
+ }
+
} //______________________________________________________
} //____________________________________________________________________________
|