|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:41
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/src/edu/harvard/syrah/sbon/async Added Files: EL.java Barrier.java LoopIt.java CallbacksIF.java Sync.java EventLoopIF.java Loop.java CBResult.java CBQueue.java Config.java Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: EL.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 6, 2005 */ package edu.harvard.syrah.sbon.async; import static edu.harvard.syrah.sbon.async.EL.Priority.HIGH; import static edu.harvard.syrah.sbon.async.EL.Priority.LOW; import static edu.harvard.syrah.sbon.async.EL.Priority.NORMAL; import java.io.IOException; import java.nio.channels.*; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.PriorityBlockingQueue; import edu.harvard.syrah.prp.ANSI; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.POut; import edu.harvard.syrah.prp.PTimer; import edu.harvard.syrah.prp.ANSI.Color; import edu.harvard.syrah.sbon.async.CallbacksIF.*; /** * * Implementation of the main event loop. * * This class is modeled after the Java implementation of David Mazieres' * libasync library, which was done by Sean Rhea as part of the Bamboo * implementation. * */ public class EL implements EventLoopIF { protected static final Log log = new Log(EL.class); public int SELECTION_KEY_ALL_OPS = SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE; // Maximum time that we can spend servicing the event queue without accessing // the network private static final long MAX_EVENT_QUEUE_PERIOD = 10; // static final long MAX_CB_TIME = 0; // was 5ms // Maximum size for the event queue private static final long EVENT_QUEUE_LIMIT = 1000; private static final boolean DEFAULT_SHOW_CB_TIME = true; private static final long DEFAULT_STATE_DUMP_INTERVAL = 0; private static final boolean DEFAULT_SHOW_IDLE = false; private boolean showIdle; private boolean showCBTime; private static final long NO_SLACKTIME = 0; private static final long INFINITE_SLACKTIME = Long.MAX_VALUE; public enum Priority { HIGH, NORMAL, LOW; } private Queue<CB0> now_eQ_HP = new ConcurrentLinkedQueue<CB0>(); private Queue<CB0> now_eQ_NP = new ConcurrentLinkedQueue<CB0>(); private Queue<CB0> now_eQ_LP = new ConcurrentLinkedQueue<CB0>(); private Queue<CB0> eQ_HP = new PriorityBlockingQueue<CB0>(); private Queue<CB0> eQ_NP = new PriorityBlockingQueue<CB0>(); private Queue<CB0> eQ_LP = new PriorityBlockingQueue<CB0>(); private Selector selector; private Map<SelectableChannel, ChannelSub> channelSubTable = new HashMap<SelectableChannel, ChannelSub>(); private boolean loopExit = false; private boolean forceExit = false; private boolean ranShutdown = false; private int numSelectorKeys; private static EventLoopIF eventLoop = null; private int max_eQ_HP; private int max_eQ_NP; private int max_eQ_LP; private int max_now_eQ_HP; private int max_now_eQ_NP; private int max_now_eQ_LP; public static void set(EventLoopIF newEventLoop) { eventLoop = newEventLoop; } public static EventLoopIF get() { return eventLoop; } public EL() { this(DEFAULT_STATE_DUMP_INTERVAL, DEFAULT_SHOW_IDLE); } public EL(final long stateDumpInterval, final boolean showIdle) { Thread.currentThread().setPriority(Thread.MAX_PRIORITY); this.showIdle = showIdle; this.showCBTime = DEFAULT_SHOW_CB_TIME; log.debug("Opening the selector..."); try { selector = Selector.open(); } catch (IOException e) { log.error("Could not open selector: " + e); } /* * TODO * * This is sometimes not executed on shutdown and I don't understand why. * This might be connected to ant...? */ // Make sure that we shutdown cleanly Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { EL.this.shutdown(); } }); if (stateDumpInterval != 0) { this.registerTimerCB(stateDumpInterval, new CB0("ELStateDumper") { protected void cb(CBResult result) { dumpState(true); if (!shouldExit()) EL.this.registerTimerCB(stateDumpInterval, this); } }); } } public void dumpState(boolean eventQueueDump) { log.info("now_eQ_HP.size=" + now_eQ_HP.size() + " now_eQ_HP.max=" + max_now_eQ_HP + (eventQueueDump ? " now_eq_HP=" + POut.toString(now_eQ_HP) : "")); log.info("now_eQ_NP.size=" + now_eQ_NP.size() + " now_eQ_NP.max=" + max_now_eQ_NP + (eventQueueDump ? " now_eq_NP=" + POut.toString(now_eQ_NP) : "")); log.info("now_eQ_LP.size=" + now_eQ_LP.size() + " now_eQ_LP_max=" + max_now_eQ_LP + (eventQueueDump ? " now_eq_LP=" + POut.toString(now_eQ_LP) : "")); log.info("eQ_HP.size=" + eQ_HP.size() + " eQ_HP.max=" + max_eQ_HP + (eventQueueDump ? " eq_HP=" + POut.toString(eQ_HP) : "")); log.info("eQ_NP.size=" + eQ_NP.size() + " eQ_NP.max=" + max_eQ_NP + (eventQueueDump ? " eq_NP=" + POut.toString(eQ_NP) : "")); log.info("eQ_LP.size=" + eQ_LP.size() + " eQ_LP_max=" + max_eQ_LP + (eventQueueDump ? " eq_LP=" + POut.toString(eQ_LP) : "")); log.info("cST.size=" + channelSubTable.size() + " cST=" + POut.toString(channelSubTable) + " numSelectorKeys=" + numSelectorKeys); log.info("totalMem=" + POut.toString(((double) Runtime.getRuntime().totalMemory()) / (1024 * 1024)) + " MB" + " freeMem=" + POut.toString(((double) Runtime.getRuntime().freeMemory()) / (1024 * 1024)) + " MB"); // log.main("eQ_NP.peek=" + eQ_NP.remove()); // log.main("eQ_NP.peek=" + eQ_NP.remove()); // log.main("eQ_NP.peek=" + eQ_NP.remove()); max_eQ_HP = eQ_HP.size() > max_eQ_HP ? eQ_HP.size() : max_eQ_HP; max_eQ_NP = eQ_NP.size() > max_eQ_NP ? eQ_NP.size() : max_eQ_NP; max_eQ_LP = eQ_LP.size() > max_eQ_LP ? eQ_LP.size() : max_eQ_LP; max_now_eQ_HP = now_eQ_HP.size() > max_now_eQ_HP ? now_eQ_HP.size() : max_now_eQ_HP; max_now_eQ_NP = now_eQ_NP.size() > max_now_eQ_NP ? now_eQ_NP.size() : max_now_eQ_NP; max_now_eQ_LP = now_eQ_LP.size() > max_now_eQ_LP ? now_eQ_LP.size() : max_now_eQ_LP; } /* * Effectively inserts an event into the event queue * * (non-Javadoc) * * @see edu.harvard.syrah.sbon.async.EventLoopIF#registerTimerEvent(edu.harvard.syrah.sbon.async.CB0) */ public CB0 registerTimerCB(CB0 cb) { return registerTimerCB(cb, Priority.NORMAL); } public CB0 registerTimerCB(CB0 cb, Priority priority) { return registerTimerCB(0, cb, priority); } public CB0 registerTimerCB(long delay, CB0 cb) { return registerTimerCB(delay, cb, Priority.NORMAL); } /* * Acts like delaycb from the libasync C library * * (non-Javadoc) * * @see edu.harvard.syrah.sbon.async.EventLoopIF#registerTimerEvent(long, * edu.harvard.syrah.sbon.async.CB0) */ public CB0 registerTimerCB(long delay, CB0 cb, Priority priority) { cb.ts = System.currentTimeMillis() + delay; Queue<CB0> eventQueue = null; if (delay != 0) { eventQueue = getEventQueue(priority); } else { eventQueue = getNowEventQueue(priority); } eventQueue.add(cb); /* * TODO handle this properly */ /* * if (eventQueue.size() > EVENT_QUEUE_LIMIT) { log.warn("The queue with * pri=" + priority + " has grown beyong its limit (" + EVENT_QUEUE_LIMIT + ") * size=" + eventQueue.size() + " cb=" + cb); } */ // Make sure that the selector is not block if this is called from another thread selector.wakeup(); return cb; } public void registerTimerCB(Barrier barrier, CB0 cb) { registerTimerCB(barrier, cb, Priority.NORMAL); } public void registerTimerCB(Barrier barrier, CB0 cb, Priority priority) { registerTimerCB(barrier, 0, cb, priority); } public void registerTimerCB(Barrier barrier, long delay, CB0 cb) { registerTimerCB(barrier, delay, cb, Priority.NORMAL); } public void registerTimerCB(Barrier barrier, long delay, CB0 cb, Priority priority) { barrier.registerTimerCB(delay, cb, priority); } public long deregisterTimerCB(CB0 cb) { return deregisterTimerCB(cb, Priority.NORMAL); } public long deregisterTimerCB(CB0 cb, Priority priority) { assert cb != null; Queue<CB0> eventQueue = getEventQueue(priority); if (!eventQueue.remove(cb)) { log.error("The eventQueue=" + POut.toString(eventQueue) + " doesn't contain the cb=" + cb); } return cb.ts - System.currentTimeMillis(); } public void registerCommCB(SelectableChannel channel, int selectionKey) throws ClosedChannelException { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); assert channelSub != null : "channelSub is null for registerCommCB call. channel=" + channel; assert channelSub.checkCommCBs(selectionKey); try { // Register with selector while preserving previous registrations if (channel.keyFor(selector) != null) { // log.debug("Updating key."); channel.register(selector, channel.keyFor(selector).interestOps() | selectionKey, channelSub); } else { // log.debug("Adding new key."); channel.register(selector, selectionKey, channelSub); } } catch (IllegalArgumentException e) { log.error("Illegal key selector mask: " + e + " selectionKey=" + selectionKey); } catch (CancelledKeyException e) { log.warn(("CancelledKeyException channel=" + channel + " e=" + e)); } // log.debug("channelSub: channel=" + channel + " "+ channelSub); } public void deregisterCommCB(SelectableChannel channel, int selectionKey) throws ClosedChannelException { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); // assert channelSub != null; if (channelSub == null) { log.warn("channelSub==null channel=" + channel + " Ignoring."); return; } SelectionKey key = channel.keyFor(selector); assert key != null; // Deregister with selector while preserving previous registrations channel.register(selector, key.interestOps() & (~selectionKey), channelSub); } public void deregisterAllCommCBs(SelectableChannel channel) throws ClosedChannelException { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); // assert channelSub != null : "channel=" + channel; if (channelSub == null) { log.warn("channelSub==null channel=" + channel + " Ignoring."); return; } channel.register(selector, 0, channelSub); } public CB1R<Boolean, SelectionKey> getCommCB(SelectableChannel channel, int selectionKey) { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); // Is this an unknown channel? if (channelSub == null) return null; return channelSub.getCommCB(selectionKey); } public void setCommCB(SelectableChannel channel, int selectionKey, CB1R<Boolean, SelectionKey> commCB) { assert channel != null; assert commCB != null; ChannelSub channelSub = channelSubTable.get(channel); if (channelSub == null) { channelSub = new ChannelSub(); channelSubTable.put(channel, channelSub); } channelSub.setCommCBs(selectionKey, commCB); } public void unsetCommCB(SelectableChannel channel, int selectionKey) { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); assert channelSub != null; assert (!channel.keyFor(selector).isValid() || (channel.keyFor(selector) .interestOps() & selectionKey) == 0); channelSub.setCommCBs(selectionKey, null); if (channelSub.isEmpty()) { channelSubTable.remove(channel); } } public void unsetAllCommCBs(SelectableChannel channel) { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); if (channelSub != null) { /* * TODO This assertion is violated when the key has already been * cancelled. */ // assert channel.keyFor(selector) == null || // (channel.keyFor(selector).interestOps() & SELECTION_KEY_ALL_OPS) == 0; channelSub.setCommCBs(SELECTION_KEY_ALL_OPS, null); channelSubTable.remove(channel); } else { log.warn("Trying to close an already closed channel=" + channel); } } public void main() { log.debug(ANSI.color(Color.LIGHTRED, "Starting main event loop...")); long slackTime; long nextEventTS; // PTimer pt = new PTimer(false); while (!forceExit && !(loopExit && eventQueuesEmpty())) { // log.debug(" Starting new event loop iteration"); // pt.start(); nextEventTS = handleEventQueues(); // pt.stop(log, "handleEventQueues took"); slackTime = (nextEventTS == INFINITE_SLACKTIME) ? INFINITE_SLACKTIME : (nextEventTS - System.currentTimeMillis()); // log.debug("slackTime=" + (slackTime == INFINITE_SLACKTIME ? "INF" : // String.valueOf(slackTime)) + " eventQueuesEmpty=" + // eventQueuesEmpty()); if ((!now_eQ_HP.isEmpty() || !now_eQ_NP.isEmpty() || !now_eQ_LP.isEmpty()) && (slackTime == INFINITE_SLACKTIME)) { log.warn("Now_queues have events but slacktime is INF. Resetting."); slackTime = 0; } assert (slackTime != INFINITE_SLACKTIME) || eventQueuesEmpty() : "slackTime=" + slackTime; if (showIdle && slackTime != INFINITE_SLACKTIME && slackTime > 0) log.info("Idle for " + slackTime + " ms"); // now_eQ_NORM.size=" + // now_eQ_NP.size()); if (!forceExit && !loopExit) { // pt.start(); handleSelector(slackTime); // pt.stop(log, "handleSelector (slackTime=" + (slackTime == // INFINITE_SLACKTIME ? "INF" : String.valueOf(slackTime)) + ") took"); // pt.start(); handleSelectCallbacks(); // pt.stop(log, "handleSelectCBs took"); // log.debug("numSelectors=" + numSelectorKeys); } } log.main("Exited main event loop."); } public void forceExit() { forceExit = true; } public void exit() { log.main(ANSI.color(Color.LIGHTRED, "Ready to exit main event loop...")); loopExit = true; } public boolean shouldExit() { return loopExit; } public void handleNetwork() { handleSelector(-1); handleSelectCallbacks(); } public boolean checkChannelState(SelectableChannel channel, int selectionKey) { handleSelector(-1); for (Iterator keyIt = selector.selectedKeys().iterator(); keyIt.hasNext();) { SelectionKey key = (SelectionKey) keyIt.next(); if (key.channel().equals(channel) && key.isValid() && ((key.readyOps() & selectionKey) == selectionKey)) { return true; } } return false; } private boolean eventQueuesEmpty() { return eQ_HP.isEmpty() && eQ_NP.isEmpty() && eQ_LP.isEmpty() && now_eQ_HP.isEmpty() && now_eQ_NP.isEmpty() && now_eQ_LP.isEmpty(); } private Queue<CB0> getEventQueue(Priority priority) { Queue<CB0> eventQueue = null; switch (priority) { case HIGH: { eventQueue = eQ_HP; break; } case NORMAL: { eventQueue = eQ_NP; break; } case LOW: { eventQueue = eQ_LP; break; } default: { log.error("Unknown priority"); break; } } return eventQueue; } private Queue<CB0> getNowEventQueue(Priority priority) { Queue<CB0> eventQueue = null; switch (priority) { case HIGH: { eventQueue = now_eQ_HP; break; } case NORMAL: { eventQueue = now_eQ_NP; break; } case LOW: { eventQueue = now_eQ_LP; break; } default: { log.error("Unknown priority"); break; } } return eventQueue; } private long handleEventQueues() { long startTime = 0; long globalNextTS = INFINITE_SLACKTIME; long processingTime = 0; boolean HP_hasSlack = false; boolean NP_hasSlack = false; boolean LP_hasSlack = false; // while ((!(eQ_HP.isEmpty() && now_eQ_HP.isEmpty()) && !HP_hasSlack) // || (!(eQ_NP.isEmpty() && now_eQ_NP.isEmpty()) && !NP_hasSlack) // || (!(eQ_LP.isEmpty() && now_eQ_LP.isEmpty()) && !LP_hasSlack)) { while (((!eQ_HP.isEmpty() && !HP_hasSlack) || !now_eQ_HP.isEmpty()) || ((!eQ_NP.isEmpty() && !NP_hasSlack) || !now_eQ_NP.isEmpty()) || ((!eQ_LP.isEmpty() && !LP_hasSlack) || !now_eQ_LP.isEmpty())) { // log.debug("Executing events..."); long currentTime = System.currentTimeMillis(); if (startTime == 0) startTime = currentTime; processingTime = currentTime - startTime; // log.debug("processing.remaining=" + (MAX_EVENT_QUEUE_PERIOD - // processingTime)); if (processingTime >= MAX_EVENT_QUEUE_PERIOD) { // Make sure that we continue the event queue processing globalNextTS = currentTime; break; } HP_hasSlack = false; NP_hasSlack = false; LP_hasSlack = false; globalNextTS = INFINITE_SLACKTIME; boolean handleNext = true; /* HIGH */ // log.debug("Considering HIGH queue..."); if (!eQ_HP.isEmpty() && handleNext) { long nextEventTS = handleEvent(HIGH, currentTime); if (nextEventTS > currentTime) { HP_hasSlack = now_eQ_HP.isEmpty(); // log.debug("HIGH has slack"); } else { handleNext = false; } globalNextTS = Math.min(nextEventTS, globalNextTS); } // log.debug("Considering now_HIGH queue..."); if (!now_eQ_HP.isEmpty() && handleNext) { long nextEventTS = handleNowEvent(HIGH); handleNext = false; globalNextTS = Math.min(nextEventTS, globalNextTS); } /* NORMAL */ // log.debug("Considering NORMAL queue..."); if (!eQ_NP.isEmpty() && handleNext) { long nextEventTS = handleEvent(NORMAL, currentTime); if (nextEventTS > currentTime) { NP_hasSlack = now_eQ_NP.isEmpty(); // log.debug("NORMAL has slack"); } else { handleNext = false; } globalNextTS = Math.min(nextEventTS, globalNextTS); } // log.debug("Considering now_NORMAL queue..."); if (!now_eQ_NP.isEmpty() && handleNext) { long nextEventTS = handleNowEvent(NORMAL); handleNext = false; globalNextTS = Math.min(nextEventTS, globalNextTS); } /* LOW */ // log.debug("Considering LOW queue..."); if (!eQ_LP.isEmpty() && handleNext) { long nextEventTS = handleEvent(LOW, currentTime); if (nextEventTS > currentTime) { LP_hasSlack = now_eQ_LP.isEmpty(); // log.debug("LOW has slack"); } else { handleNext = false; } globalNextTS = Math.min(nextEventTS, globalNextTS); } // log.debug("Considering now_LOW queue..."); if (!now_eQ_LP.isEmpty() && handleNext) { long nextEventTS = handleNowEvent(LOW); handleNext = false; globalNextTS = Math.min(nextEventTS, globalNextTS); } // log.debug("globalNextTS=" + globalNextTS); } // log.debug("globalNextTS=" + (globalNextTS == INFINITE_SLACKTIME ? "INF" : // String.valueOf(globalNextTS)) // + " eventQueueTime=" + (MAX_EVENT_QUEUE_PERIOD - processingTime)); return globalNextTS; } private long handleEvent(Priority priority, long currentTime) { Queue<CB0> eventQueue = getEventQueue(priority); CB0 cb = eventQueue.peek(); long eventSlackTime = cb.ts - currentTime; // log.debug("pri=" + priority + " eventSlackTime=" + eventSlackTime); // Do we need to execute the event at once? if (eventSlackTime <= 0) { assert cb != null; PTimer pt = null; if (showCBTime) pt = new PTimer(); // log.debug("cb=" + nextEvent.cb); cb.cb(CBResult.OK()); // pt.stop(log, "Event CB (cb=" + nextEvent.cb + ") took"); if (showCBTime) { pt.stop(); if (MAX_CB_TIME > 0 && pt.getTime() > MAX_CB_TIME) log.warn("cb=" + cb + " took " + pt.toString() + " > " + MAX_CB_TIME + " ms"); } eventQueue.remove(cb); if (!eventQueue.isEmpty()) { cb = eventQueue.peek(); return cb.ts; } return INFINITE_SLACKTIME; } // log.debug("pri=" + priority + " nextEventTS=" + nextEvent.ts); return cb.ts; } private long handleNowEvent(Priority priority) { Queue<CB0> eventQueue = getNowEventQueue(priority); CB0 cb = eventQueue.remove(); assert cb != null && cb != null; PTimer pt = null; if (showCBTime) pt = new PTimer(); // log.info("now_cb=" + nextEvent.cb); cb.call(CBResult.OK()); // pt.stop(log, "Now_Event CB (cb=" + nextEvent.cb + ") took"); if (showCBTime) { pt.stop(); if (MAX_CB_TIME > 0 && pt.getTime() > MAX_CB_TIME) log.warn("cb=" + cb + " took " + pt.toString() + " > " + MAX_CB_TIME + " ms"); } if (!eventQueue.isEmpty()) { cb = eventQueue.peek(); return cb.ts; } return INFINITE_SLACKTIME; } private void handleSelector(long slackTime) { if (slackTime > 0) { if (slackTime == INFINITE_SLACKTIME) { // Block indefinitely try { // log.debug("Sleeping..." /* + eQ=" + eventQueue.size() */ ); selector.select(); // log.debug("Woken up..."); //assert (eventqueues not empty || forceExit || !selector.selectedKeys().isEmpty()) : "The selector returned without any selected keys. This should never happen."; } catch (IOException e) { log.error("Could not complete select(): " + e); } } else { // Block for slacktime try { // log.debug("Sleeping for " + slackTime + " ms"); selector.select(slackTime); // log.debug("Woken up..."); } catch (IOException e) { log.error("Could not complete select(slackTime): " + e); } } } else { /* * TODO It would make sense to plan in advance here and sleep until the * next event. A selectNow would only be necessary if the next event has * to be executed immediately. */ // Do an instant select (non blocking) try { selector.selectNow(); } catch (IOException e) { log.error("Could not complete selectNow(): " + e); } } } private void handleSelectCallbacks() { try { // PTimer pt = new PTimer(); numSelectorKeys = selector.selectedKeys().size(); //log.debug("selectedKeys.size=" + numSelectorKeys); for (Iterator keyIt = selector.selectedKeys().iterator(); keyIt.hasNext();) { SelectionKey key = (SelectionKey) keyIt.next(); CB1R<Boolean, SelectionKey> cbComm = null; // commEvent.ts = System.currentTimeMillis(); ChannelSub channelSub = (ChannelSub) key.attachment(); int readyOps = key.readyOps(); //log.debug("channel=" + key.channel()); if (key.isValid() && key.isAcceptable()) { // log.debug("Handling comms cb with key.isAcceptable()"); if (channelSub.acceptCB != null) { cbComm = channelSub.acceptCB; // PTimer pt = new PTimer(); if (cbComm.cb(CBResult.OK(), key) && key.isValid()) { readyOps &= ~SelectionKey.OP_ACCEPT; } // pt.stop(log, "Accept CB took"); } else { log.debug("acceptCB=null channel=" + key.channel() + "cST=" + POut.toString(channelSubTable)); } } if (key.isValid() && key.isConnectable()) { // log.debug("Handling comms cb with key.isConnectable()"); if (channelSub.connectCB != null) { cbComm = channelSub.connectCB; // PTimer pt = new PTimer(); if (cbComm.cb(CBResult.OK(), key) && key.isValid()) { readyOps &= ~SelectionKey.OP_ACCEPT; } // pt.stop(log, "Connect CB took"); } else { log.debug("connectCB=null channel=" + key.channel() + " cST=" + POut.toString(channelSubTable)); } } if (key.isValid() && key.isReadable()) { //log.debug("Handling comms cb with key.isReadable()"); if (channelSub.readCB != null) { cbComm = channelSub.readCB; // PTimer pt = new PTimer(); if (cbComm.cb(CBResult.OK(), key) && key.isValid()) { readyOps &= ~SelectionKey.OP_READ; } // pt.stop(log, "Read CB took"); } else { log.debug("readCB=null channel=" + key.channel() + " cST=" + POut.toString(channelSubTable)); } } if (key.isValid() && key.isWritable()) { // log.debug("Handling comms cb with key.isWritable()"); if (channelSub.writeCB != null) { cbComm = channelSub.writeCB; // PTimer pt = new PTimer(); if (cbComm.cb(CBResult.OK(), key) && key.isValid()) { readyOps &= ~SelectionKey.OP_WRITE; } // pt.stop(log, "Write CB took"); } else { log.debug("writeCB=null channel=" + key.channel() + "cST=" + POut.toString(channelSubTable)); } } // Have you dealt with all the interest ops? if ((!key.isValid()) || readyOps == 0) { try { keyIt.remove(); } catch (ConcurrentModificationException e) { log.error(e.toString()); } } } // pt.stop(log, "HandleSelectCBs took"); } catch (ClosedSelectorException e) { log.warn("Selector closed."); shutdown(); } } protected void shutdown() { if (!ranShutdown) { log.debug("Running shutdown hook."); forceExit = true; try { log.debug("Closing selector with all connections."); selector.close(); } catch (IOException e) { log.error("Could not close selector"); } log.debug("Shutdown hook complete."); ranShutdown = true; } } class ChannelSub { protected CB1R<Boolean, SelectionKey> acceptCB = null; protected CB1R<Boolean, SelectionKey> connectCB = null; protected CB1R<Boolean, SelectionKey> readCB = null; protected CB1R<Boolean, SelectionKey> writeCB = null; void setCommCBs(int selectionKey, CB1R<Boolean, SelectionKey> commCB) { if ((selectionKey & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { acceptCB = commCB; } if ((selectionKey & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) { connectCB = commCB; } if ((selectionKey & SelectionKey.OP_READ) == SelectionKey.OP_READ) { readCB = commCB; } if ((selectionKey & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { writeCB = commCB; } } CB1R<Boolean, SelectionKey> getCommCB(int selectionKey) { if ((selectionKey & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { return acceptCB; } if ((selectionKey & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) { return connectCB; } if ((selectionKey & SelectionKey.OP_READ) == SelectionKey.OP_READ) { return readCB; } if ((selectionKey & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { return writeCB; } log.error("Unknown selectionKey"); return null; } boolean checkCommCBs(int selectionKey) { if ((selectionKey & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT && acceptCB == null) { return false; } if ((selectionKey & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT && connectCB == null) { return false; } if ((selectionKey & SelectionKey.OP_READ) == SelectionKey.OP_READ && readCB == null) { return false; } if ((selectionKey & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && writeCB == null) { return false; } return true; } boolean isEmpty() { return acceptCB == null && connectCB == null && readCB == null && writeCB == null; } public String toString() { return "acceptCB=" + (acceptCB != null) + "[" + (acceptCB != null ? acceptCB.toString() : "") + "] " + "connectCB=" + (connectCB != null) + "[" + (connectCB != null ? connectCB.toString() : "") + "] " + "readCB=" + (readCB != null) + "[" + (readCB != null ? readCB.toString() : "") + "] " + "writeCB=" + (writeCB != null) + "[" + (writeCB != null ? writeCB.toString() : "") + "]"; } } public static void main(String[] args) { EL.set(new EL()); log.main("Registering callback"); EL.get().registerTimerCB(new SimpleCB("NORM-3", 1000), HIGH); EL.get().registerTimerCB(new SimpleCB("NORM-2", 1000), NORMAL); EL.get().registerTimerCB(new SimpleCB("NORM-1", 1000), LOW); // EventLoop.get().registerTimerCB(100, new SimpleCB("NORM-A", 1000), // NORMAL); // EventLoop.get().registerTimerCB(200, new SimpleCB("NORM-B", 1000), // NORMAL); // EventLoop.get().registerTimerCB(300, new SimpleCB("NORM-C", 1000), // NORMAL); EL.get().dumpState(true); // EventLoop.get().registerTimerCB(new SimpleCB("HIGH", 1500), HIGH); // EventLoop.get().registerTimerCB(new SimpleCB("NORM", 1000), NORMAL); // EventLoop.get().registerTimerCB(new SimpleCB("LOW", 500), LOW); CB0 cbWhileLoop = new CB0() { protected void cb(CBResult result) { // do stuff boolean exitCondition = false; if (!exitCondition) EL.get().registerTimerCB(this); } }; EL.get().registerTimerCB(cbWhileLoop); EL.get().main(); } } class SimpleCB extends CB0 { private long lastRun; private long interval; public SimpleCB(String name, long interval) { super(name); this.interval = interval; } protected void cb(CBResult result) { //EventLoop.get().dumpState(true); long currentTime = System.currentTimeMillis(); if (lastRun == 0) lastRun = currentTime; POut.p("Running cb " + toString() + "... " + (currentTime - lastRun)); lastRun = currentTime; //EventLoop.get().registerTimerCB(interval, this); } } --- NEW FILE: Barrier.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 28, 2005 */ package edu.harvard.syrah.sbon.async; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0R; import edu.harvard.syrah.sbon.async.EL.Priority; /** * * Allows us to synchronize callbacks * */ public class Barrier { private static final Log log = new Log(Barrier.class); private static final long PREDICATE_INTERVAL = 1000; private boolean triggered = false; private boolean startState; private boolean activated = true; private int count; private CB0 cb; private long delay; protected CB0 userCB; public Barrier(boolean startState) { this(startState, 0, null); } public Barrier(int count) { this(count == 0 ? true : false, count, null); } public Barrier(int count, CB0 cb) { this(count == 0 ? true : false, count, cb); } public Barrier(final CB0R<Boolean> cbPredicate) { this.startState = false; this.count = 1; this.triggered = startState; EL.get().registerTimerCB(new CB0() { protected void cb(CBResult resultOK) { if (cbPredicate.call(resultOK)) Barrier.this.join(); else EL.get().registerTimerCB(PREDICATE_INTERVAL, this); } }); } private Barrier(boolean startState, int count, CB0 cb) { this.startState = startState; this.count = count; this.cb = cb; this.triggered = startState; } public void activate() { activated = true; } public void deactivate() { activated = false; } public boolean isActive() { return activated; } public void fork() { assert startState || !triggered || !activated : "Could not fork: startState=" + startState + " triggered=" + triggered + " activated=" + activated; startState = false; count++; triggered = false; } public void join() { assert startState || !triggered || !activated : "Could not join: startState=" + startState + " triggered=" + triggered + " activated=" + activated; startState = false; count--; if (count == 0 && activated) { triggered = true; if (cb != null) { log.debug("Barrier triggered."); // Register the event callback EL.get().registerTimerCB(delay, cb); } if (userCB != null) { EL.get().registerTimerCB(new CB0() { public void cb(CBResult result) { userCB.cb(CBResult.OK()); } }); } } } /* * added by glp */ public void setNumForks(int newCount){ this.count = newCount; if (newCount > 0) { triggered = false; } } public void remove() { log.debug("Barrier removed."); } public void registerCB(CB0 myUserCB) { this.userCB = myUserCB; if (triggered) { EL.get().registerTimerCB(new CB0() { public void cb(CBResult result) { userCB.cb(result); } }); } } void registerTimerCB(long delay, CB0 cb, Priority priority) { this.cb = cb; this.delay = delay; if (triggered) { log.debug("Barrier triggered."); // Register the event callback EL.get().registerTimerCB(delay, cb, priority); } } public String toString() { return "count=" + count + " triggered=" + triggered; } } --- NEW FILE: CBResult.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jul 11, 2005 */ package edu.harvard.syrah.sbon.async; import java.io.Serializable; public class CBResult implements Serializable { static final long serialVersionUID = 1000000001L; public enum CBState {OK, ERROR, UNKNOWN, TIMEOUT} public CBState state; public String what; private CBResult(CBState state) { this(state, null); } private CBResult(CBState state, String what) { this.state = state; this.what = what; } public static CBResult OK() { return new CBResult(CBState.OK); } public static CBResult OK(String what) { return new CBResult(CBState.OK, what); } public static CBResult ERROR() { return new CBResult(CBState.ERROR); } public static CBResult ERROR(String what) { return new CBResult(CBState.ERROR, what); } public static CBResult ERROR(Exception e) { return new CBResult(CBState.ERROR, e.toString()); } public static CBResult UNKNOWN() { return new CBResult(CBState.UNKNOWN); } public static CBResult UNKNOWN(String what) { return new CBResult(CBState.UNKNOWN, what); } public static CBResult TIMEOUT() { return new CBResult(CBState.TIMEOUT); } public static CBResult TIMEOUT(String what) { return new CBResult(CBState.TIMEOUT, what); } public String toString() { switch (state) { case OK : return "OK"; case ERROR : return "ERROR(" + what + ")"; case UNKNOWN : return "UNKNWON(" + what + ")"; case TIMEOUT : return "TIMEOUT(" + what + ")"; default : return "?"; } } } --- NEW FILE: CBQueue.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Nov 8, 2005 */ package edu.harvard.syrah.sbon.async; import java.util.LinkedList; import java.util.List; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; public class CBQueue { private static final Log log = new Log(CBQueue.class); private static final long RETRY_INTERVAL = 1000; private int maxCBs; private List<CB1<CB1<Boolean>>> waitingCBs = new LinkedList<CB1<CB1<Boolean>>>(); private int outstandingCBs = 0; public CBQueue(int maxCBs) { this.maxCBs = maxCBs; } public void enqueueDequeue(final CB1<CB1<Boolean>> newCB) { //log.debug("outstandingCBs=" + outstandingCBs + " maxCBs=" + maxCBs + " waitingCBs.size=" + waitingCBs.size()); if (newCB != null) waitingCBs.add(newCB); if ((maxCBs == 0 || outstandingCBs < maxCBs) && !waitingCBs.isEmpty()) { final CB1<CB1<Boolean>> cb = waitingCBs.remove(0); outstandingCBs++; cb.call(CBResult.OK(), new CB1<Boolean>() { protected void cb(CBResult result, Boolean handled) { outstandingCBs--; if (handled) { dequeue(); } else { waitingCBs.add(0, cb); EL.get().registerTimerCB(RETRY_INTERVAL, new CB0() { protected void cb(CBResult result) { dequeue(); } }); } } }); } } public void dequeue() { enqueueDequeue(null); } } --- NEW FILE: LoopIt.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Aug 11, 2005 */ package edu.harvard.syrah.sbon.async; import java.util.ConcurrentModificationException; import java.util.Iterator; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0R; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.CallbacksIF.CB2; import edu.harvard.syrah.sbon.async.EL.Priority; public class LoopIt<T> { protected static final Log log = new Log(LoopIt.class); // Maximum time of single loop iteration in nanoseconds private static final long MAX_SINGLE_ITERATION_TIME = 1 * 1000000; private String name; protected CB2<T, CB0R<Boolean>> cbIteratorRecursion; private Priority priority; protected Iterator<T> it; private boolean remove; private boolean smartScheduling; protected Barrier loopBarrier; public LoopIt(Iterable<T> it, final CB1<T> cbRecursion) { this(null, it, false, false, new CB2<T, CB0R<Boolean>>() { protected void cb(CBResult result, T item, CB0R<Boolean> cbMore) { cbRecursion.call(result, item); cbMore.callOK(); } }); } public LoopIt(Iterable<T> it, final CB2<T, CB0> cbRecursion) { this(null, it, false, false, new CB2<T, CB0R<Boolean>>() { protected void cb(CBResult result, T item, final CB0R<Boolean> cbMore) { cbRecursion.call(result, item, new CB0() { protected void cb(CBResult result) { cbMore.callOK(); } }); } }); } public LoopIt(String name, Iterable<T> it, CB2<T, CB0R<Boolean>> cbIteratorRecursion) { this(name, it, true, false, cbIteratorRecursion, Priority.NORMAL); } public LoopIt(String name, Iterable<T> it, boolean remove, CB2<T, CB0R<Boolean>> cbIteratorRecursion) { this(name, it, remove, false, cbIteratorRecursion, Priority.NORMAL); } public LoopIt(String name, Iterable<T> it, boolean remove, boolean smartScheduling, CB2<T, CB0R<Boolean>> cbIteratorRecursion) { this(name, it, remove, smartScheduling, cbIteratorRecursion, Priority.NORMAL); } public LoopIt(String name, Iterable<T> it, boolean remove, boolean smartScheduling, CB2<T, CB0R<Boolean>> cbIteratorRecursion, Priority priority) { this.name = name; this.it = it.iterator(); this.remove = remove; this.smartScheduling = smartScheduling; this.cbIteratorRecursion = cbIteratorRecursion; this.priority = priority; loopBarrier = new Barrier(1); } public void execute(CB0 cbDone) { EL.get().registerTimerCB(execute(), cbDone); } public Barrier execute() { EL.get().registerTimerCB(new CB0(name) { protected void cb(CBResult resultOK) { recurseIterator(); } }); return loopBarrier; } long startTime; long runningTime; public void recurseIterator() { if (startTime == 0) startTime = System.nanoTime(); if (it.hasNext()) { try { final T item = it.next(); runningTime = System.nanoTime() - startTime; if (smartScheduling && runningTime < MAX_SINGLE_ITERATION_TIME) { cbIteratorRecursion.call(CBResult.OK(), item, new CB0R<Boolean>() { protected Boolean cb(CBResult result) { // Try to remove the object after an iteration to reclaim memory if (remove) { try { it.remove(); } catch (UnsupportedOperationException e) { /* ignore */ } } boolean hasNext = it.hasNext(); recurseIterator(); return hasNext; } }); } else { EL.get().registerTimerCB(new CB0(name) { protected void cb(CBResult result) { runningTime = 0; cbIteratorRecursion.call(CBResult.OK(), item, new CB0R<Boolean>() { protected Boolean cb(CBResult result) { // Try to remove the object after an iteration to reclaim memory if (remove) { try { it.remove(); } catch (UnsupportedOperationException e) { /* ignore */ } } boolean hasNext = it.hasNext(); recurseIterator(); return hasNext; } }); } }, priority); } } catch (ConcurrentModificationException e) { log.error("Concurrent loop modification: name=" + name + " it=" + it + " e=" + e); } } else { loopBarrier.join(); startTime = 0; } } } --- NEW FILE: Loop.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Aug 11, 2005 */ package edu.harvard.syrah.sbon.async; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.CallbacksIF.CB2; import edu.harvard.syrah.sbon.async.EL.Priority; public class Loop<T> { protected static final Log log = new Log(Loop.class); private String name; protected CB2<T, CB1<T>> cbRecursion; private Priority priority; private T startItem; protected Barrier loopBarrier; public Loop(String name, T item, CB2<T, CB1<T>> cbRecursion) { this(name, item, cbRecursion, Priority.NORMAL); } public Loop(String name, T item, CB2<T, CB1<T>> cbRecursion, Priority priority) { this.name = name; this.cbRecursion = cbRecursion; this.startItem = item; this.priority = priority; this.loopBarrier = new Barrier(false); } public Barrier execute() { recurse(startItem); /* * EventLoop.get().registerTimerCB(new EventCB(name) { protected void * cb(CBResult result, Event timerEvent) { recurse(startItem); } }, * priority); */ return loopBarrier; } public void recurse(final T item) { log.debug("Forking barrier=" + loopBarrier); loopBarrier.fork(); log.debug("Forked barrier=" + loopBarrier); EL.get().registerTimerCB(new CB0(name) { protected void cb(CBResult result) { cbRecursion.call(CBResult.OK(), item, new CB1<T>() { protected void cb(CBResult result, T item) { if (item != null) recurse(item); else { log.debug("Joining barrier" + loopBarrier); loopBarrier.join(); log.debug("Joined barrier" + loopBarrier); } } }); } }, priority); } } --- NEW FILE: Config.java --- package edu.harvard.syrah.sbon.async; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.*; import edu.harvard.syrah.prp.Log; public class Config { private static final Log log = new Log(Config.class); private static Properties configProps = System.getProperties(); public static Properties getConfigProps() { return configProps; } public static void setConfigProps(Properties props) { configProps = props; } public static String getProperty(String name, String defaultValue) { return configProps.getProperty(name, defaultValue); } public static String getProperty(String name) { return configProps.getProperty(name); } public static void read(String configRoot, String filename) { try { InputStream ip = new FileInputStream(filename); //log.info("config=" + filename); Properties configFileProps = new Properties(); configFileProps.load(ip); for (Object propertyObj : configFileProps.keySet()) { String property = (String) propertyObj; String value = configFileProps.getProperty(property); if (!Config.getConfigProps().containsKey(property) && value != null && value.length() != 0) { //log.debug("prop=" + property + " val=" + value); Config.getConfigProps().setProperty(property, value); } } SortedMap<String, String> allProps = new TreeMap<String, String>(); Properties cleanProps = new Properties(); for (Object propertyObj : Config.getConfigProps().keySet()) { //for (Iterator<Object> objIt = Config.getConfigProps().keySet().iterator(); objIt.hasNext();) { String property = (String) propertyObj; if (property.startsWith(configRoot)) { String shortName = property.substring(configRoot.length() + 1, property.length()); allProps.put(shortName, Config.getConfigProps().getProperty(property)); cleanProps.put(shortName, Config.getConfigProps().getProperty(property)); } } Config.configProps = cleanProps; log.info("configProperties=" + allProps.toString()); //log.debug("cleanProps=" + cleanProps.toString()); } catch (FileNotFoundException e1) { log.error("Could not open config file: " + e1); } catch (IOException e2) { log.error("Could not load config properties from file: " + e2); } } } --- NEW FILE: Sync.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jun 21, 2006 */ package edu.harvard.syrah.sbon.async; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; public class Sync { private static final Log log = new Log(Sync.class); private static int threadCounter = 0; /* * TODO convert this to use a thread pool */ public static void callBlocking(final CB0 cbBlocking, final CB0 cbDone) { final Thread blockingThread = new Thread("BlockingThread-" + (++threadCounter)) { @Override public void run() { super.run(); cbBlocking.callOK(); EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { cbDone.callOK(); } }); } }; blockingThread.run(); } } --- NEW FILE: EventLoopIF.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 6, 2005 */ package edu.harvard.syrah.sbon.async; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import edu.harvard.syrah.sbon.async.CallbacksIF.*; import edu.harvard.syrah.sbon.async.EL.Priority; /** * * This is the interface of the main asynchronous event loop for the node. * */ public interface EventLoopIF { /** * Main method that runs the event loop. It should be the last call in the program's main method. */ public void main(); /** * Force the exist from the main event loop. */ public void forceExit(); /** * Gracefully exit the main loop. This will only exit the main event loop if there are no pending * timers. */ public void exit(); public boolean shouldExit(); public CB0 registerTimerCB(CB0 cb); public CB0 registerTimerCB(CB0 cbEvent, Priority priority); /** * This methods allows an object to register a callback timer. * * @param delay * @param event */ public CB0 registerTimerCB(long delay, CB0 cbEvent); public CB0 registerTimerCB(long delay, CB0 cbEvent, Priority priority); /** * Register a new callback timer when the barrier permits it. * * @param barrier * @param cbEvent */ public void registerTimerCB(Barrier barrier, CB0 cbEvent); public void registerTimerCB(Barrier barrier, CB0 cbEvent, Priority priority); public void registerTimerCB(Barrier barrier, long delay, CB0 cbEvent); public void registerTimerCB(Barrier barrier, long delay, CB0 cbEvent, Priority priority); /** * Remove an existing timer callback from the event queue. * * @param CB0 */ public long deregisterTimerCB(CB0 cb); public long deregisterTimerCB(CB0 cb, Priority priority); /** * Registers the interest in communication events. Note that selectionKey is not a bit field, * which means that calls to this method are additive. * * @param channel * @param selectionKey * @param event */ public void registerCommCB(SelectableChannel channel, int selectionKey) throws ClosedChannelException; /** * Allows an object to deregister a communication callback. * * @param channel * @param selectionKey * @throws ClosedChannelException */ public void deregisterCommCB(SelectableChannel channel, int selectionKey) throws ClosedChannelException; public void deregisterAllCommCBs(SelectableChannel channel) throws ClosedChannelException; public void setCommCB(SelectableChannel channel, int selectionKey, CB1R<Boolean, SelectionKey> commCB); public void unsetCommCB(SelectableChannel channel, int selectionKey); public CB1R<Boolean, SelectionKey> getCommCB(SelectableChannel channel, int selectionkey); public void unsetAllCommCBs(SelectableChannel channel); public boolean checkChannelState(SelectableChannel channel, int selectionKey); public void handleNetwork(); public void dumpState(boolean eventQueueDump); } --- NEW FILE: CallbacksIF.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 28, 2005 */ package edu.harvard.syrah.sbon.async; import java.util.LinkedList; import java.util.List; /** * * Asynchronous callback interface * */ public interface CallbacksIF { public abstract class AbstractCB implements Comparable { private long timeout = 0; private String name = null; protected boolean cancelled = false; private List<CB0> cancelledCBs = null; protected CB0 cbTimeout = null; protected long ts; public AbstractCB() { /* empty */ } public AbstractCB(long timeout) { setTimeout("CBTimeout", timeout); } public AbstractCB(String name) { this.name = name; } public abstract void callCBResult(CBResult result); protected boolean checkCancelled(CBResult result) { if (cbTimeout != null) { EL.get().deregisterTimerCB(cbTimeout); cbTimeout = null; } boolean returnResult = cancelled; if (result.state == CBResult.CBState.ERROR || result.state == CBResult.CBState.TIMEOUT) { cancelled = true; } return returnResult; } public void setTimeout(String name, final long timeout) { if (timeout == 0) return; this.timeout = timeout; // Register the timer for the timeout cbTimeout = EL.get().registerTimerCB(timeout, new CB0(name) { public void cb(CBResult result) { if (!cancelled) { // Call the timeout callback AbstractCB.this.callCBResult(CBResult.TIMEOUT(timeout + "ms")); AbstractCB.this.cancel(); } AbstractCB.this.cbTimeout = null; } }); } public void cancel() { cancelled = true; if (cbTimeout != null) { EL.get().deregisterTimerCB(cbTimeout); cbTimeout = null; } if (cancelledCBs != null) { for (CB0 cancelledCB : cancelledCBs) cancelledCB.callOK(); } } public void cancel(CB0 cb) { cancel(); cb.callOK(); } public boolean isCancelled() { return cancelled; } public void registerCancelledCB(CB0 cancelledCB) { if (cancelledCBs == null) cancelledCBs = new LinkedList<CB0>(); cancelledCBs.add(cancelledCB); } public void deregisterCancelledCB(CB0 cancelledCB) { cancelledCBs.remove(cancelledCB); } public void deregisterAllCancelledCBs() { cancelledCBs.clear(); } public boolean hasTimeout() { return cbTimeout != null; } public String toString() { String cbName = super.toString(); return (ts != 0 ? ts : "") + name == null ? cbName.substring(cbName.lastIndexOf('.') + 1) : name; } public long getTS() { return ts; } public int compareTo(Object obj) { assert obj instanceof AbstractCB; AbstractCB cmpCB = (AbstractCB) obj; // log.debug("this=" + this + " cmpEvent=" + cmpEvent); // Equality means that this is the same object if (this == cmpCB) { // log.debug("equal"); return 0; } /* * >= */ int comparison = (ts > cmpCB.ts) ? 1 : -1; // log.debug("comparison=" + comparison); return comparison; } } public abstract class CB0 extends AbstractCB { public CB0() { /* emtpty */ } public CB0(long timeout) { super(timeout); } public CB0(String name) { super(name); } protected abstract void cb(CBResult result); public void callCBResult(CBResult result) { if (!checkCancelled(result)) cb(result); } public void callOK() { if (!checkCancelled(CBResult.OK())) cb(CBResult.OK()); } public void callERROR() { if (!checkCancelled(CBResult.ERROR())) cb(CBResult.ERROR()); } public void call(CBResult result) { if (!checkCancelled(result)) cb(result); } } public abstract class CB1<T1> extends AbstractCB { public CB1() { /* emtpty */ } public CB1(String name) { super(name); } public CB1(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1); public void callCBResult(CBResult result) { cb(result, null); } public void call(CBResult result, T1 arg1) { if (!checkCancelled(result)) cb(result, arg1); } } public abstract class CB2<T1, T2> extends AbstractCB { public CB2() { /* emtpty */ } public CB2(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2); public void callCBResult(CBResult result) { cb(result, null, null); } public void call(CBResult result, T1 arg1, T2 arg2) { if (!checkCancelled(result)) cb(result, arg1, arg2); } } public abstract class CB3<T1, T2, T3> extends AbstractCB { public CB3() { /* emtpty */ } public CB3(long timeout) { super(timeout); } public CB3(String name) { super(name); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3); public void callCBResult(CBResult result) { cb(result, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3); } } public abstract class CB4<T1, T2, T3, T4> extends AbstractCB { public CB4() { /* emtpty */ } public CB4(long timeout) { super(timeout); } public CB4(String name) { super(name); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4); public void callCBResult(CBResult result) { cb(result, null, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3, arg4); } } public abstract class CB5<T1, T2, T3, T4, T5> extends AbstractCB { public CB5() { /* emtpty */ } public CB5(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5); public void callCBResult(CBResult result) { cb(result, null, null, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3, arg4, arg5); } } public abstract class CB6<T1, T2, T3, T4, T5, T6> extends AbstractCB { public CB6() { /* emtpty */ } public CB6(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6); public void callCBResult(CBResult result) { cb(result, null, null, null, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3, arg4, arg5, arg6); } } public abstract class CB7<T1, T2, T3, T4, T5, T6, T7> extends AbstractCB { public CB7() { /* emtpty */ } public CB7(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7); public void callCBResult(CBResult result) { cb(result, null, null, null, null, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3, arg4, arg5, arg6, arg7); } } public abstract class CB0R<R> extends AbstractCB { public CB0R() { /* emtpty */ } public CB0R(long timeout) { super(timeout); } protected abstract R cb(CBResult result); public void callCBResult(CBResult result) { cb(result); } public R call(CBResult result) { if (!checkCancelled(result)) return cb(result); return null; } public void callOK() { if (!checkCancelled(CBResult.OK())) cb(CBResult.OK()); } } public abstract class CBR<R, T1> extends AbstractCB { public CBR() { /* emtpty */ } public CBR(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1); public void callCBResult(CBResult result) { cb(result, null); } public R call(CBResult result, T1 arg1) { if (!checkCancelled(result)) return cb(result, arg1); return null; } } public abstract class CB1R<R, T1> extends AbstractCB { public CB1R() { /* emtpty */ } public CB1R(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1); public void callCBResult(CBResult result) { cb(result, null); } public R call(CBResult result, T1 arg1) { if (!checkCancelled(result)) return cb(result, arg1); return null; } } public abstract class CB2R<R, T1, T2> extends AbstractCB { public CB2R() { /* emtpty */ } public CB2R(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1, T2 arg2); public void callCBResult(CBResult result) { cb(result, null, null); } public R call(CBResult result, T1 arg1, T2 arg2) { if (!checkCancelled(result)) return cb(result, arg1, arg2); return null; } } public abstract class CB3R<R, T1, T2, T3> extends AbstractCB { public CB3R() { /* emtpty */ } public CB3R(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1, T2 arg2, T3 arg3); public void callCBResult(CBResult result) { cb(result, null, null, null); } public R call(CBResult result, T1 arg1, T2 arg2, T3 arg3) { if (!checkCancelled(result)) return cb(result, arg1, arg2, arg3); return null; } } public abstract class CB4R<R, T1, T2, T3, T4> extends AbstractCB { public CB4R() { /* emtpty */ } public CB4R(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4); public void callCBResult(CBResult result) { cb(result, null, null, null, null); } public R call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4) { if (!checkCancelled(result)) return cb(result, arg1, arg2, arg3, arg4); return null; } } } |