|
From: Mike H. <he...@us...> - 2002-12-03 15:07:28
|
Update of /cvsroot/velcro/velcro/main/src/java/velcro/broadcaster
In directory sc8-pr-cvs1:/tmp/cvs-serv16650/velcro/broadcaster
Modified Files:
MessageBroadcasterEJB.java
Log Message:
Removed JMS stuff.
Added initial message routing loops.
Added message table loading.
Index: MessageBroadcasterEJB.java
===================================================================
RCS file: /cvsroot/velcro/velcro/main/src/java/velcro/broadcaster/MessageBroadcasterEJB.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** MessageBroadcasterEJB.java 16 Oct 2002 04:47:51 -0000 1.1
--- MessageBroadcasterEJB.java 3 Dec 2002 15:07:25 -0000 1.2
***************
*** 29,40 ****
package velcro.broadcaster;
! import javax.ejb.MessageDrivenBean;
! import javax.ejb.MessageDrivenContext;
import javax.ejb.SessionBean;
import javax.ejb.SessionContext;
! import javax.jms.JMSException;
! import javax.jms.Message;
! import javax.jms.MessageListener;
! import javax.jms.ObjectMessage;
import org.apache.commons.logging.Log;
--- 29,46 ----
package velcro.broadcaster;
! import java.util.Collection;
! import java.util.HashMap;
! import java.util.Iterator;
! import java.util.Map;
! import java.util.TreeMap;
! import java.util.TreeSet;
!
! import javax.ejb.CreateException;
import javax.ejb.SessionBean;
import javax.ejb.SessionContext;
! import javax.naming.InitialContext;
! import javax.naming.NameClassPair;
! import javax.naming.NamingEnumeration;
! import javax.naming.NamingException;
import org.apache.commons.logging.Log;
***************
*** 43,103 ****
import velcro.messaging.ActionMessage;
! public class MessageBroadcasterEJB implements SessionBean, MessageDrivenBean, MessageListener {
! public static final String LOG_NAME = "Velcro Message Broadcaster";
! protected SessionContext mContext;
! protected MessageDrivenContext mMsgContext;
! protected Log mLogger;
public MessageBroadcasterEJB() {
! mLogger = LogFactory.getLog(LOG_NAME);
}
public void ejbCreate() {
! mLogger.trace("In ejbCreate");
}
public void ejbRemove() {
! mLogger.trace("In ejbRemove");
}
public void ejbActivate() {
! mLogger.trace("In ejbActivate");
}
public void ejbPassivate() {
! mLogger.trace("In ejbPassivate");
}
public void setSessionContext(SessionContext context) {
! mLogger.trace("Setting session context");
!
! mContext = context;
! }
!
! public void setMessageDrivenContext(MessageDrivenContext context) {
! mLogger.trace("Setting message driven context");
!
! mMsgContext = context;
! }
!
! public void onMessage(Message message) {
! mLogger.trace("Received JMS message");
! try {
! //Extract action message object from JMS message.
! ObjectMessage om = (ObjectMessage)message;
! ActionMessage am = (ActionMessage)om.getObject();
!
! if (mLogger.isInfoEnabled()) {
! mLogger.info("Received Action Message through JMS of type " + am.getType());
! }
!
! // Process the message
! broadcast(am);
! } catch (JMSException e) {
! mLogger.fatal("Error extracting ActionMessage from JMS message", e);
! }
}
--- 49,122 ----
import velcro.messaging.ActionMessage;
! public class MessageBroadcasterEJB implements SessionBean {
! /**
! * JNDI location containing the message processor entries.
! */
! public static final String MESSAGE_TABLE_LOCATION = "java:comp/env/messages";
! public static final String ENV_DEFAULT_PRIORITY = "java:comp/env/defaultPriority";
! public static final String JNDI_LOCATION_PROCESSOR_EJBS = "java:comp/env";
! public static final int STATIC_DEFAULT_PRIORITY = 50;
! public static final int STATIC_DEFAULT_LOOP_COUNT = 100;
! public static final String PROCESSOR_ENTRY_EJB_NAME = "ejbName";
! public static final String PROCESSOR_ENTRY_PRIORITY = "priority";
! public static final String PROCESSOR_ENTRY_BROADCAST_MODE = "broadcastMode";
! public static final String BROADCAST_MODE_BROADCAST = "broadcast";
! public static final String BROADCAST_MODE_CONTROLLER = "controller";
!
! protected Log log = LogFactory.getLog(this.getClass());
!
! protected int DEFAULT_PRIORITY;
!
! protected SessionContext context;
!
! protected Map processors = new TreeMap();
!
! /**
! * Used to store all the session bean instances so Statefull session beans
! * can be used with velcro.
! */
! private Map processorMap;
public MessageBroadcasterEJB() {
! processorMap = new HashMap();
}
public void ejbCreate() {
! log.trace("In ejbCreate");
!
! try {
! InitialContext ic = new InitialContext();
! Integer dp = (Integer)ic.lookup(ENV_DEFAULT_PRIORITY);
! DEFAULT_PRIORITY = dp.intValue();
! } catch (NamingException ne) {
! log.debug("'" + ENV_DEFAULT_PRIORITY + "' not found.");
! DEFAULT_PRIORITY = STATIC_DEFAULT_PRIORITY;
! }
!
! if (log.isDebugEnabled()) {
! log.debug("Default priority set to: " + DEFAULT_PRIORITY);
! }
!
! updateMessageEntryTable();
}
public void ejbRemove() {
! log.trace("In ejbRemove");
}
public void ejbActivate() {
! log.trace("In ejbActivate");
}
public void ejbPassivate() {
! log.trace("In ejbPassivate");
}
public void setSessionContext(SessionContext context) {
! log.trace("Setting session context");
! this.context = context;
}
***************
*** 112,123 ****
*/
public ActionMessage broadcast(ActionMessage message) {
! if (mLogger.isInfoEnabled()) {
! mLogger.info("Processing message of type " + message.getType());
}
!
return message;
}
public void queue(ActionMessage message) {
}
--- 131,340 ----
*/
public ActionMessage broadcast(ActionMessage message) {
! // Ensure the processor map is empty.
! processorMap.clear();
!
! if (log.isDebugEnabled()) {
! // TODO: Internationalize debug message.
! log.debug("Broadcasting message of type " + message.getType());
}
!
! int loopCount = 0;
! while (!message.getPhase().equals(ActionMessage.PHASE_DONE)) {
! if (loopCount > STATIC_DEFAULT_LOOP_COUNT) {
! // TODO: Internationalize these strings.
! log.debug("Infinite loop check triggered.");
! throw new RuntimeException("Max loop count exceded");
! }
! loopCount++;
! String type = message.getType();
! String phase = null;
! if (!processors.containsKey(type)) {
! // TODO: Create an exception for this (and internationalize it)
! log.debug("No registered processors for type: '" + message.getType() + "'");
! throw new RuntimeException("No registered processors for type: '" + message.getType() + "'");
! }
! Map processorPhases = (Map)processors.get(type);
! phase = message.getPhase();
! if (!processorPhases.containsKey(phase)) {
! // TODO: Create custom exception for this.
! log.debug("There are no processors registered for phase: '" + message.getPhase() + "'");
! throw new RuntimeException("There are no processors registered for phase: '" + message.getPhase() + "'");
! }
! BroadcastModeContainer modes = (BroadcastModeContainer)processorPhases.get(phase);
!
! Collection controllers = modes.getControllers();
! Collection broadcasters = modes.getBroadcasters();
! if (controllers.isEmpty()) {
! // TODO: Internationalize this.
! log.warn("There are no controller processors for message: " + message);
! }
!
! log.trace("Iterating through broadcasters.");
! message = iterateProcessors(broadcasters, message);
! log.trace("Iterating through controllers.");
! message = iterateProcessors(controllers, message);
!
! }
!
! // TODO: Properly empty processorMap
!
return message;
}
public void queue(ActionMessage message) {
+ log.trace("Queuing message");
+ }
+
+ protected ActionMessage iterateProcessors(Collection processors, ActionMessage message) {
+ try {
+ InitialContext ic = new InitialContext();
+ Iterator i = processors.iterator();
+ while (i.hasNext()) {
+ MessageProcessorEntry mpe = (MessageProcessorEntry)i.next();
+
+ if (log.isTraceEnabled()) {
+ log.trace("PROCESSOR ENTRY -- Type: " + mpe.getType() + " Phase: " + mpe.getPhase() + " EJB Name: " + mpe.getEjbName());
+ log.trace(" Pre-process MESSAGE -- " + message);
+ }
+
+ String ejbName = mpe.getEjbName();
+ if (!processorMap.containsKey(ejbName)) {
+ //TODO: Add checking for remote or local interface.
+ String jndiName = JNDI_LOCATION_PROCESSOR_EJBS + "/" + ejbName;
+ if (log.isTraceEnabled()) {
+ log.trace("Looking up message processor at: " + jndiName);
+ }
+ MessageProcessorLocalHome processorHome = (MessageProcessorLocalHome)ic.lookup(jndiName);
+ MessageProcessorLocal processor = processorHome.create();
+ processorMap.put(ejbName, processor);
+ }
+ //TODO: Add checking for remote or local interface.
+ MessageProcessorLocal processor = (MessageProcessorLocal)processorMap.get(ejbName);
+ message = processor.processMessage(message);
+
+ if (log.isTraceEnabled()) {
+ log.trace(" Post-process MESSAGE -- " + message);
+ }
+ }
+
+ return message;
+ } catch (NamingException ne) {
+ // TODO: Create custom exception for this and internationlize.
+ log.debug(ne);
+ throw new RuntimeException("Naming error", ne);
+ } catch (CreateException ce) {
+ // TODO: Internationalize this message
+ log.debug(ce);
+ throw new RuntimeException("Could not create message processor EJB", ce);
+ }
+ }
+
+ protected void updateMessageEntryTable() {
+ try {
+ log.trace("Updating message entry table");
+ InitialContext ic = new InitialContext();
+
+ // Loop through all types
+ NamingEnumeration types = ic.list(MESSAGE_TABLE_LOCATION);
+ while(types.hasMore()) {
+ NameClassPair typePair = (NameClassPair)types.next();
+ String type = typePair.getName();
+
+ if (log.isTraceEnabled()) {
+ log.trace("Looking up phases with message type '" + type + "'");
+ }
+
+ String phasesLocation = MESSAGE_TABLE_LOCATION + "/" + type;
+
+ if (log.isTraceEnabled()) {
+ log.trace("Looking up phases from " + phasesLocation);
+ }
+
+ // Loop through all the phases for the current type
+ NamingEnumeration phases = ic.list(phasesLocation);
+ while(phases.hasMore()) {
+ NameClassPair phasePair = (NameClassPair)phases.next();
+ String phase = phasePair.getName();
+
+ if (log.isTraceEnabled()) {
+ log.trace("Looking up message processors for type: '" + type + "' and phase: '" + phase + "'");
+ }
+
+ String processorLocation = phasesLocation + "/" + phase;
+ if (log.isTraceEnabled()) {
+ log.trace("Looking up processor parameters from " + processorLocation);
+ }
+
+ NamingEnumeration processorNames = ic.list(processorLocation);
+ while(processorNames.hasMore()) {
+ NameClassPair processorNamePair = (NameClassPair)processorNames.next();
+ String processorName = processorNamePair.getName();
+
+ String processorEntriesLocation = processorLocation + "/" + processorName;
+
+ String ejbName;
+ int priority;
+
+ // Lookup ejbName
+ ejbName = (String)ic.lookup(processorEntriesLocation + "/" + PROCESSOR_ENTRY_EJB_NAME);
+ if (log.isTraceEnabled()) {
+ log.trace("Looking up processor params for: '" + ejbName + "'");
+ }
+
+ // Lookup priority
+ try {
+ // Try block here is for looking up the priority. If thepriority isn't
+ // found (throws an exception) uses the default.
+ Integer p = (Integer)ic.lookup(processorEntriesLocation + "/" + PROCESSOR_ENTRY_PRIORITY);
+ priority = p.intValue();
+ } catch (NamingException ne) {
+ // TODO: Internationalize this string.
+ log.debug("Priority not set, using default.");
+ priority = DEFAULT_PRIORITY;
+ }
+
+ MessageProcessorEntry mpe = new MessageProcessorEntry(type, phase, ejbName, priority);
+
+ // Check if the type is already in processors map.
+ if (!processors.containsKey(type)) {
+ // Add phases map.
+ processors.put(type, new TreeMap());
+ }
+ Map phasesMap = (Map)processors.get(type);
+ if (!phasesMap.containsKey(phase)) {
+ Collection broadcasters = new TreeSet(new MessageProcessorEntryComparator());
+ Collection controllers = new TreeSet(new MessageProcessorEntryComparator());
+ BroadcastModeContainer bmc = new BroadcastModeContainer(broadcasters, controllers);
+ phasesMap.put(phase, bmc);
+ }
+ BroadcastModeContainer processorsList = (BroadcastModeContainer)phasesMap.get(phase);
+
+ // Check to see this processor is to be handled as a controller or a broadcaster.
+ try {
+ String broadcastMode = (String)ic.lookup(processorEntriesLocation + "/" + PROCESSOR_ENTRY_BROADCAST_MODE);
+ if (broadcastMode.compareToIgnoreCase(BROADCAST_MODE_CONTROLLER) == 0) {
+ processorsList.getControllers().add(mpe);
+ } else {
+ // Defaults to broadcaster.
+
+ // Make sure the following code matches the adding for a broadcaster in the
+ // catch block below.
+ processorsList.getBroadcasters().add(mpe);
+ }
+ } catch (NamingException ne) {
+ // TODO: Internationalize this string.
+ log.debug("Broadcast mode not set, defaulting to broadcast.");
+
+ // Make sure the following code matches the adding for a broadcaster in the above
+ // try block.
+ processorsList.getBroadcasters().add(mpe);
+ }
+ } // Processor names loop
+ }// Phases loop
+ } // Types loop
+ } catch (NamingException e) {
+ log.error(e);
+ throw new RuntimeException(e);
+ }
}
***************
*** 126,129 ****
--- 343,351 ----
/*
* $Log$
+ * Revision 1.2 2002/12/03 15:07:25 heathm
+ * Removed JMS stuff.
+ * Added initial message routing loops.
+ * Added message table loading.
+ *
* Revision 1.1 2002/10/16 04:47:51 heathm
* Moved source tree from /main/src to /main/src/java.
|