Author: trader Date: 2009-06-30 14:02:30 -0700 (Tue, 30 Jun 2009) New Revision: 13429 URL: http://svn.hyperic.org/?view=rev&root=Hyperic+HQ&revision=13429 Modified: branches/HQ_4_1/etc/ehcache.xml branches/HQ_4_1/sql/events/TriggerEvent.hbm.xml branches/HQ_4_1/src/org/hyperic/hq/bizapp/server/trigger/conditional/MultiConditionTrigger.java branches/HQ_4_1/src/org/hyperic/hq/events/server/session/TriggerEventDAO.java branches/HQ_4_1/src/org/hyperic/hq/product/server/mbean/ProductPluginDeployer.java branches/HQ_4_1/src/org/hyperic/util/stats/ConcurrentStatsCollector.java Log: HHQ-3178 and HQ-1773. Reviewed by hq core team. Attempt to workaround MySQL timeout issues in trigger_event queries. Add concurrent stats at QA's request. Modified: branches/HQ_4_1/etc/ehcache.xml =================================================================== --- branches/HQ_4_1/etc/ehcache.xml 2009-06-30 20:00:21 UTC (rev 13428) +++ branches/HQ_4_1/etc/ehcache.xml 2009-06-30 21:02:30 UTC (rev 13429) @@ -658,6 +658,12 @@ timeToIdleSeconds="0" timeToLiveSeconds="0" memoryStoreEvictionPolicy="LRU"/> + <cache name="org.hyperic.hq.events.server.session.TriggerEvent" + maxElementsInMemory="50000" + eternal="true" + timeToIdleSeconds="0" + timeToLiveSeconds="0" + memoryStoreEvictionPolicy="LRU"/> <!-- Group alert caches --> <cache name="org.hyperic.hq.galerts.server.session.GalertDef" Modified: branches/HQ_4_1/sql/events/TriggerEvent.hbm.xml =================================================================== --- branches/HQ_4_1/sql/events/TriggerEvent.hbm.xml 2009-06-30 20:00:21 UTC (rev 13428) +++ branches/HQ_4_1/sql/events/TriggerEvent.hbm.xml 2009-06-30 21:02:30 UTC (rev 13429) @@ -2,8 +2,8 @@ <hibernate-mapping package="org.hyperic.hq.events.server.session"> <!-- We prevent concurrent updates at the application level. Each trigger - reads then updates event objects serially with an exclusive lock. Thus, - we don't need an optimistic locking strategy and the associated overhead. --> + reads then updates event objects serially with an exclusive lock. Thus, + we don't need an optimistic locking strategy and the associated overhead. --> <class name="TriggerEvent" table="EAM_TRIGGER_EVENT" optimistic-lock="none"> <id name="id" type="long"> <column name="ID" not-null="true"/> @@ -12,21 +12,25 @@ </generator> </id> + <properties name="measurementId" unique="false"> + <property name="triggerId" type="integer"> + <column name="TRIGGER_ID" not-null="true" index="TRIGGER_EVENT_TRIGGER_ID_IDX"/> + </property> + <property name="id" index="TRIGGER_EVENT_TRIGGER_ID_IDX"/> + </properties> + <property lazy="true" name="eventObject" type="binary"> <column name="EVENT_OBJECT" not-null="true" length="256"/> </property> - <property name="triggerId" type="integer"> - <column name="TRIGGER_ID" not-null="true" index="TRIGGER_EVENT_TRIGGER_ID_IDX"/> - </property> - <property name="ctime" type="long"> - <column name="CTIME" not-null="true" index="TRIGGER_EVENT_CTIME_IDX"/> + <column name="CTIME" not-null="true"/> </property> <property name="expiration" type="long"> - <column name="EXPIRATION" not-null="true" index="TRIGGER_EVENT_EXPIRATION_IDX"/> + <column name="EXPIRATION" not-null="true"/> </property> </class> </hibernate-mapping> + Modified: branches/HQ_4_1/src/org/hyperic/hq/bizapp/server/trigger/conditional/MultiConditionTrigger.java =================================================================== --- branches/HQ_4_1/src/org/hyperic/hq/bizapp/server/trigger/conditional/MultiConditionTrigger.java 2009-06-30 20:00:21 UTC (rev 13428) +++ branches/HQ_4_1/src/org/hyperic/hq/bizapp/server/trigger/conditional/MultiConditionTrigger.java 2009-06-30 21:02:30 UTC (rev 13429) @@ -65,6 +65,7 @@ import org.hyperic.util.config.InvalidOptionValueException; import org.hyperic.util.config.LongConfigOption; import org.hyperic.util.config.StringConfigOption; +import org.hyperic.util.stats.ConcurrentStatsCollector; /** The MultiConditionTrigger is a specialized trigger that can combine multiple * conditions and only fire actions when all conditions have been met @@ -96,78 +97,13 @@ /** Holds value of property durable. */ private boolean durable; + protected Object monitor; + /** Creates a new instance of MultiConditionTrigger */ public MultiConditionTrigger() { - baton = new Baton(); + monitor = this; } - - protected static class Baton { - // Container class for all things synchronized. This class does no - // synchronization on its own: concurrency correctness must be - // ensured by the calling code. - - // The thread currently holding the baton - Thread processor; - - // The list of events that the baton holder must process - LinkedList eventsToProcess; - - // Prior persisted state, if any. This will usually be empty/null. - LinkedList priorState; - - Baton() { - eventsToProcess = new LinkedList(); - priorState = null; - } - - Thread getProcessor() { - return processor; - } - - /* - * NOTE: returns the actual list reference, not a copy. - */ - LinkedList getEventsToProcess() { - return eventsToProcess; - } - - /* - * NOTE: returns the actual list reference, not a copy. - */ - LinkedList getPriorState() { - return priorState; - } - - void setPriorState(LinkedList priorState) { - this.priorState = priorState; - } - - // Must be done under lock! - boolean grab() { - boolean result = false; - if (processor == null) { - processor = Thread.currentThread(); - result = true; - } else { - if (processor.equals(Thread.currentThread())) { - // Should not happen! - log.error("Illegal monitor state in Baton"); - // But...the best thing to do in this (impossible?) scenario is to proceed - result = true; - } - } - - return result; - } - - // Must be done under lock! - public void release() { - processor = null; - } - } - - protected Baton baton; - + /** * Process an event from the dispatcher. This is the main entrypoint method. * This method must: @@ -205,36 +141,19 @@ log.debug("processEvent for event " + event); } - LinkedList eventsToProcess = getEventsToProcess(eventToProcess); - if (eventsToProcess != null) { - // We have the baton, continue - evaluateEventList(eventsToProcess); + long start = System.currentTimeMillis(); + synchronized (monitor) { + evaluateEvent(eventToProcess); } + long time = System.currentTimeMillis() - start; + ConcurrentStatsCollector.getInstance().addStat(time, + ConcurrentStatsCollector.MULTI_COND_TRIGGER_MON_WAIT); } - private LinkedList getEventsToProcess(AbstractEvent event) { - LinkedList result = null; - synchronized (baton) { - if (baton.grab()) { - // Drain the event queue and continue processing - result = new LinkedList(baton.getEventsToProcess()); - baton.getEventsToProcess().clear(); - result.add(event); - } else { - // Queue the event for the baton holder to process - baton.getEventsToProcess().add(event); - } - } - - return result; - } - /** - * Internal workhorse method. This method is not synchronized because it assumes it is - * running only when it has the baton. Chew through the list of events until it is emptied. - * It is known at the start of this method that eventsToProcess.size() > 0. + * Internal workhorse method, main purpose is to abstract out any locking semantics. */ - void evaluateEventList(LinkedList eventsToProcess) { + void evaluateEvent(AbstractEvent newEvent) { EventTrackerLocal etracker = null; try { @@ -242,105 +161,45 @@ } catch (Exception e) { log.error("Internal error, cannot create event tracker", e); } - - boolean drainedQueue = false; + + LinkedList eventsToProcess = getPersistedReferencedEvents(etracker); + boolean doDebug = log.isDebugEnabled(); Set persistedEventsToDelete = new HashSet(); - // It guaranteed that, if we're here, there is at least one event in the queue - do { - - AbstractEvent current = (AbstractEvent) eventsToProcess.removeFirst(); - - if (doDebug) { - log.debug("evaluating event " + current); - } - - // Evaluate one-by-one. Requirements: - // - // 1. Evaluate events in the order they came in, starting with the event - // at position startFrom. - // 2. Filter out expired events based on the timestamp of the "current" event. - // If this trigger has an expiration of N milliseconds, then any events - // with a timestamp less than (current.getTimestamp() - N) is considered - // to be expired. - // 3. Once triggering conditions are met, fire and get rid of all previous events, - // per the feature requirements for this trigger. - // 4. Leave events that occurred after the triggering event for subsequent - // evaluation. - // 5. A MultiConditionTrigger can fire with a single triggering condition, - // for example, if the conditions are "a OR b" and a fires. This means - // the list of conditions need to be evaluated front-to-back when there - // are any OR conditions. If there are only AND conditions, then the - // minimum is the number of AND conditions. - // - // Other things to note: - // - // - a TriggerNotFiredEvent has no useful meaning if there are no prior - // TriggerFiredEvents for the condition - // - a TriggerFiredEvent followed (not necessarily immediately followed, if - // the TriggerNotFiredEvent occurs before sufficient fulfilling conditions) - // by a TriggerNotFiredEvent for the same condition effectively neutralizes - // both events - if (baton.getPriorState() == null) { - - // First time seeing this trigger. There may be backed-up events - LinkedList persistedStream = getPersistedReferencedEvents(etracker); - baton.setPriorState(persistedStream); - } - + + if (doDebug) { + log.debug("evaluating event " + newEvent); + } + + Collection fulfilled = doEvaluation(eventsToProcess, + newEvent, + etracker, + persistedEventsToDelete); + if (fulfilled != null) { if (doDebug) { - log.debug("Prior state for event " + current + " is " + baton.getPriorState()); + log.debug("Trigger " + this + " firing on event " + newEvent); } - - Collection fulfilled = doSingleEvaluation(baton.getPriorState(), - current, - etracker, - persistedEventsToDelete); - if (fulfilled != null) { - if (doDebug) { - log.debug("Trigger " + this + " firing on event " + current); - } - - persistedEventsToDelete.clear(); - fire(fulfilled, etracker); - } - - if (eventsToProcess.isEmpty()) { - synchronized (baton) { - if (baton.getEventsToProcess().isEmpty()) { - // Looks like we're done here...cleanup and go home - try { - etracker.deleteEvents(persistedEventsToDelete); - } catch (SQLException sqle) { - log.error("Error deleting events for trigger ID " + getId()); - } - if (log.isDebugEnabled()) { - log.debug("MultiConditionTrigger trigger id=" + getId() + - " deleting event set size=" + persistedEventsToDelete.size()); - } - baton.release(); - drainedQueue = true; - } else { - // New events have come in while we were processing - eventsToProcess.addAll(baton.getEventsToProcess()); - baton.getEventsToProcess().clear(); - } - } - } - - if (!drainedQueue) { - if (log.isDebugEnabled()) { - log.debug("Event queue size is " + baton.getEventsToProcess().size() + - ", continuing."); - } - } - - } while (!drainedQueue); + + // Don't bother clearing, fire causes these to be deleted anyway. + persistedEventsToDelete.clear(); + fire(fulfilled, etracker); + } + + // Delete any past invalidated events + try { + etracker.deleteEvents(persistedEventsToDelete); + } catch (SQLException sqle) { + log.error("Error deleting events for trigger ID " + getId(), sqle); + } + if (log.isDebugEnabled()) { + log.debug("MultiConditionTrigger trigger id=" + getId() + + " deleting event set size=" + persistedEventsToDelete.size()); + } } /** * Evaluate this trigger based on a set of prior events and a new, incoming event. This - * method should only be called when the set of prior events is not sufficient to cause + * method will typically only be called when the set of prior events is not sufficient to cause * a trigger fire. * * @param priorEvents A live, mutable collection of prior events that will be @@ -355,7 +214,7 @@ * @return The collection of events that causes this trigger to fire. Null * if the incoming event does not cause the trigger to fire. */ - private Collection doSingleEvaluation(LinkedList priorEvents, + private Collection doEvaluation(LinkedList priorEvents, AbstractEvent event, EventTrackerLocal etracker, Set persistedEventsToDelete) { @@ -369,6 +228,7 @@ } AbstractEvent toUpdate = null; + priorEvents.add(event); // Create a table to keep track Map fulfilled = new LinkedHashMap(); @@ -379,7 +239,8 @@ AbstractEvent tracked = (AbstractEvent) iter.next(); if (tracked.getTimestamp() >= expire) { - if (tracked.getInstanceId().equals(event.getInstanceId())) { + if (tracked != event && + tracked.getInstanceId().equals(event.getInstanceId())) { // If this tracked event equals the new event, then // the old one is obsolete. There can be only one event @@ -395,7 +256,6 @@ // evaluations. In the common case, the list comes from memory, not from // the persisted event stream. This remove only affects the in-memory // stream. The persisted stream is handled by the updateReference() call. - iter.remove(); } else if (tracked instanceof TriggerFiredEvent) { fulfilled.put(tracked.getInstanceId(), tracked); @@ -424,8 +284,7 @@ // as being expired. // INVARIANT: toUpdate != tracked here, because toUpdate is only // assigned a value if tracked is not expired. - deleteEvent(tracked, persistedEventsToDelete); - iter.remove(); + persistedEventsToDelete.add(tracked.getId()); } } @@ -465,14 +324,14 @@ // state means (1) set the event object BLOB to the current event, // and (2) update the expiration time to reflect the current time. // This code will always update a NON-EXPIRED trigger condition. - updateStateForEvent(etracker, priorEvents, event, toUpdate); + updateStateForEvent(etracker, event, toUpdate); // If we updated, we will not track. track = false; } else { // No expiration, but the old state is made obsolete by the new // state. Delete the old state, it will be replaced with the new state. - deleteEvent(toUpdate, persistedEventsToDelete); + persistedEventsToDelete.add(toUpdate.getId()); } } @@ -482,7 +341,12 @@ } if (track) { - addEvent(etracker, priorEvents, event); + try { + etracker.addReference(getId(), event, getTimeRange()); + } catch (SQLException sqle) { + log.error("Failed to add reference for event on trigger ID " + getId()); + } + if (log.isDebugEnabled()) { log.debug("MultiConditionTrigger trigger id=" + getId() + " adding reference."); @@ -528,26 +392,12 @@ } } - protected void addEvent(EventTrackerLocal etracker, - List priorEvents, - AbstractEvent event) { - try { - etracker.addReference(getId(), event, getTimeRange()); - priorEvents.add(event); - } catch (SQLException sqle) { - log.error("Failed to add reference for event on trigger ID " + getId()); - } - } - protected void updateStateForEvent(EventTrackerLocal etracker, - List priorEvents, AbstractEvent event, AbstractEvent toUpdate) { try { - event.setId(toUpdate.getId()); - priorEvents.add(event); etracker.updateReference(getId(), toUpdate.getId(), - toUpdate, getTimeRange()); + event, getTimeRange()); if (log.isDebugEnabled()) { log.debug("MultiConditionTrigger trigger id=" + getId() + " updating references for teid " + toUpdate.getId()); @@ -558,11 +408,6 @@ } } - protected void deleteEvent(AbstractEvent toDelete, - Set persistedEventsToDelete) { - persistedEventsToDelete.add(toDelete.getId()); - } - /** * Publish a TriggerNotFiredEvent. The notFired() method is (as of this writing) * final in the superclass, so this wrapper method exists to separate out that Modified: branches/HQ_4_1/src/org/hyperic/hq/events/server/session/TriggerEventDAO.java =================================================================== --- branches/HQ_4_1/src/org/hyperic/hq/events/server/session/TriggerEventDAO.java 2009-06-30 20:00:21 UTC (rev 13428) +++ branches/HQ_4_1/src/org/hyperic/hq/events/server/session/TriggerEventDAO.java 2009-06-30 21:02:30 UTC (rev 13429) @@ -25,6 +25,8 @@ package org.hyperic.hq.events.server.session; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.hibernate.Session; @@ -77,11 +79,19 @@ } List findAllByTriggerId(Integer tid) { - String hql = "from TriggerEvent te where te.triggerId= :tid"; + String hql = "select te.id from TriggerEvent te where " + + "te.triggerId= :tid"; - return createQuery(hql) - .setInteger("tid", tid.intValue()) - .list(); + List list = createQuery(hql) + .setInteger("tid", tid.intValue()) + .list(); + List rtn = new ArrayList(list.size()); + for (Iterator it = list.iterator(); it.hasNext(); ) { + Integer id = (Integer) it.next(); + rtn.add(findById(id)); + } + + return rtn; } int countUnexpiredByTriggerId(Integer tid, Session session) { Modified: branches/HQ_4_1/src/org/hyperic/hq/product/server/mbean/ProductPluginDeployer.java =================================================================== --- branches/HQ_4_1/src/org/hyperic/hq/product/server/mbean/ProductPluginDeployer.java 2009-06-30 20:00:21 UTC (rev 13428) +++ branches/HQ_4_1/src/org/hyperic/hq/product/server/mbean/ProductPluginDeployer.java 2009-06-30 21:02:30 UTC (rev 13429) @@ -254,22 +254,17 @@ ConcurrentStatsCollector c = ConcurrentStatsCollector.getInstance(); c.register( ConcurrentStatsCollector.RUNTIME_PLATFORM_AND_SERVER_MERGER); - c.register( - ConcurrentStatsCollector.AVAIL_MANAGER_METRICS_INSERTED); - c.register( - ConcurrentStatsCollector.DATA_MANAGER_INSERT_TIME); - c.register( - ConcurrentStatsCollector.JMS_TOPIC_PUBLISH_TIME); - c.register( - ConcurrentStatsCollector.JMS_QUEUE_PUBLISH_TIME); - c.register( - ConcurrentStatsCollector.METRIC_DATA_COMPRESS_TIME); - c.register( - ConcurrentStatsCollector.DB_ANALYZE_TIME); - c.register( - ConcurrentStatsCollector.PURGE_EVENT_LOGS_TIME); - c.register( - ConcurrentStatsCollector.PURGE_MEASUREMENTS_TIME); + c.register(ConcurrentStatsCollector.AVAIL_MANAGER_METRICS_INSERTED); + c.register(ConcurrentStatsCollector.DATA_MANAGER_INSERT_TIME); + c.register(ConcurrentStatsCollector.JMS_TOPIC_PUBLISH_TIME); + c.register(ConcurrentStatsCollector.JMS_QUEUE_PUBLISH_TIME); + c.register(ConcurrentStatsCollector.METRIC_DATA_COMPRESS_TIME); + c.register(ConcurrentStatsCollector.DB_ANALYZE_TIME); + c.register(ConcurrentStatsCollector.PURGE_EVENT_LOGS_TIME); + c.register(ConcurrentStatsCollector.PURGE_MEASUREMENTS_TIME); + c.register(ConcurrentStatsCollector.MEASUREMENT_SCHEDULE_TIME); + c.register(ConcurrentStatsCollector.EMAIL_ACTIONS); + c.register(ConcurrentStatsCollector.MULTI_COND_TRIGGER_MON_WAIT); c.startCollector(); } catch (Exception e) { _log.error("Could not start Concurrent Stats Collector", e); Modified: branches/HQ_4_1/src/org/hyperic/util/stats/ConcurrentStatsCollector.java =================================================================== --- branches/HQ_4_1/src/org/hyperic/util/stats/ConcurrentStatsCollector.java 2009-06-30 20:00:21 UTC (rev 13428) +++ branches/HQ_4_1/src/org/hyperic/util/stats/ConcurrentStatsCollector.java 2009-06-30 21:02:30 UTC (rev 13429) @@ -77,7 +77,7 @@ new ScheduledThreadPoolExecutor(1); private static final ConcurrentStatsCollector _instance = new ConcurrentStatsCollector(); - private static final int WRITE_PERIOD = 15; + public static final int WRITE_PERIOD = 15; private final Sigar _sigar = new Sigar(); private Long _pid; public static final String JVM_TOTAL_MEMORY = "JVM_TOTAL_MEMORY", @@ -103,25 +103,34 @@ METRIC_DATA_COMPRESS_TIME = "METRIC_DATA_COMPRESS_TIME", DB_ANALYZE_TIME = "DB_ANALYZE_TIME", PURGE_EVENT_LOGS_TIME = "PURGE_EVENT_LOGS_TIME", - PURGE_MEASUREMENTS_TIME = "PURGE_MEASUREMENTS_TIME"; + PURGE_MEASUREMENTS_TIME = "PURGE_MEASUREMENTS_TIME", + MEASUREMENT_SCHEDULE_TIME = "MEASUREMENT_SCHEDULE_TIME", + EMAIL_ACTIONS = "EMAIL_ACTIONS", + MULTI_COND_TRIGGER_MON_WAIT = "MULTI_COND_TRIGGER_MON_WAIT"; // using tree due to ordering capabilities private final Map _statKeys = new TreeMap(); private AtomicBoolean _hasStarted = new AtomicBoolean(false); - private final MBeanServer _mbeanServer = MBeanUtil.getMBeanServer(); - + private final MBeanServer _mbeanServer; + private ConcurrentStatsCollector() { - final char fs = File.separatorChar; - final String jbossLogSuffix = - "server" + fs + "default" + fs + "log" + fs + "hqstats" + fs; - final String d = - HQApp.getInstance().getRestartStorageDir().getAbsolutePath(); - _baseDir = d + fs + jbossLogSuffix; - _log.info("using hqstats baseDir " + _baseDir); - final File dir = new File(_baseDir); - if (!dir.exists()) { - dir.mkdir(); - } - registerInternalStats(); + final char fs = File.separatorChar; + if (System.getProperty("hq.unittest.run") != null) { + final String d = + HQApp.getInstance().getRestartStorageDir().getAbsolutePath(); + final String jbossLogSuffix = + "server" + fs + "default" + fs + "log" + fs + "hqstats" + fs; + _baseDir = d + fs + jbossLogSuffix; + _log.info("using hqstats baseDir " + _baseDir); + final File dir = new File(_baseDir); + if (!dir.exists()) { + dir.mkdir(); + } + _mbeanServer = MBeanUtil.getMBeanServer(); + registerInternalStats(); + } else { + _mbeanServer = null; + _baseDir = null; + } } public final void register(final String statId) { @@ -234,7 +243,7 @@ final String monthStr = (month < 10) ? "0"+month : String.valueOf(month); final int day = cal.get(Calendar.DAY_OF_MONTH); final String dayStr = (day < 10) ? "0"+day : String.valueOf(day); - String rtn = BASE_FILENAME+"-"+monthStr+"-"+dayStr+".csv"; + String rtn = BASE_FILENAME+"-"+monthStr+"-"+dayStr; if (withTimestamp) { final int hour = cal.get(Calendar.HOUR_OF_DAY); final String hourStr = (hour < 10) ? "0"+hour : String.valueOf(hour); @@ -242,9 +251,9 @@ final String minStr = (min < 10) ? "0"+min : String.valueOf(min); final int sec = cal.get(Calendar.SECOND); final String secStr = (sec < 10) ? "0"+sec : String.valueOf(sec); - rtn = rtn+"-"+hourStr+":"+minStr+":"+secStr; + rtn = rtn+"-"+hourStr+"."+minStr+"."+secStr; } - return _baseDir + rtn; + return _baseDir + rtn + ".csv"; } public static final ConcurrentStatsCollector getInstance() { |