Menu

MultiModalProcessing

Luca Longinotti

Multi Modal Processing

While many projects in jAER have used events from multiple input devices, there is not yet a finalized system for processing streams of events coming from multiple devices. Here we describe the state of the system that currently exists.

You can see the system that is currently in development by plugging in two or more sensors and changing the default argument of JAERViewer Class's constructor to "true". The central idea is that event packets are collected from multiple sources, then merged into a single packet. Each event is tagged with a "source" bit which identifies the source of the event.

The new system adds several new features to JAER:

  • Multi-Input filters. These filters are an extension to the existing event filters, and can be used quite similarly - you build your filter class as an extension to the MultiInputProcessor class, and implement abstract the filterPacket(), resetFilter() and initFilter() methods. The difference is that there is an added abstract method where users must specify the names of the input sources. The filter controls then have a new "source-control", allowing the user to link each source to the desired event-source. The event-source can be either an input device or another filter. With the possiblility of multiple inputs, the concept of a "Filter-Chain" gives way to a "Filter Network", in which nodes represent filters and edges represent the streams of event-packets which communicate to one-another. The MultiSourceProcessor class takes care of merging event packets from different sources before feeding them into a filter. The "source" bit of the event is used to identify which input source the event came from.

  • New GUI. There is now a new, more unified GUI, which displays all viewers, as well as the controls, in a single window. Filters can create their own custom display windows and add them to the GUI.

  • Custom Controls. Users can now create their own control panels and add them to the Filter-Controls panel. This can be useful if the object that needs controlling is defined after the filter is initialized, or when we would like to control an array of objects, each individually, whose number we do not know in advance.

Figure 3 shows the class-structure of the changes to JAER. Changes were made so as to be backwards-compatable with the existing version. The new mode can be disabled with a boolean switch in the startup code (See the constructure for JAERViewer class - make sure it's set to "false" before committing, at least until the multi-input mode is fully developed and tested). The scheme for coordinating threads is shown in Figure 1. The AEViewers are allowed to run and collect data from their respective sensors in parallel. A Cyclic Barrier object (from the Java Concurrency package), ensures that all AEViewers pause at a certain point in their loops. Once all AEViewers are waiting, the GlobalViewer Runnable is called. This Runnable executes the the event processing methods. Once it is complete, the Cyclic barrier releases the AEViewers and allows them to continue their loops.

Figure 1: The program loop: Each AEViewer lives on a separate thread, and gathers events from an input device. A Cyclic Barrier object then causes each AEViewer to hang while the GlobalViewer executes its processing tasks on the packets produced by the AEViewers. Once this is done, control is returned to the AEViewers and the loop repeats.

The changes to JAER involved adding several new core classes to the project. Here, we list them.

GlobalViewer - This class creates and runs the new user-interface. It provides the display area for inputs and filter outputs, and contains the top-level code for adding new filters to the processing network.

GlobalViewer.ViewLoop - This runnable class is called by a Cyclic Barrier Object (see Java's concurrency toolbox) when all AEViewer loops have reached a certain wait-point in their cycles. It then commands the Processing Network to process the collected event-packets.

ProcessingNetwork, ProcessingNetwork.Node - This is the generalization to the FilterChain for filters with more than one possible input source. The network contains a list of Nodes. Each node contains a list of input PacketStreams, from which event packets can be pulled, as well as a filter to process these packets. PacketStreams can either be other Nodes, in which case they return the processed event packets of those nodes, or AEViewer.Ambassadors, in which case they return event-packets pulled from the input device. The ProcessingNetwork ensures that packets are processed in such an order that depended packets are computed before those which depend upon them. Figure 2 shows an example of how the event packets are passed through the processing network.

AEViewer.Ambassador - This inner class of AEViewer serves as the ambassador through which the Global Viewer communicates with each of the AEViewer threads. It implements the PacketStreams interface, which enables processing nodes to pull event packets from it, and has method for controlling synchronization between the AEViewers.

PacketStream - Is an interface implemented by both the processing network Nodes and the AEViewer Ambaddadors. Any object implementing this interface must have a getPacket() method, which returns an EventPacket from this object.

MultiSourceProcessor - This extension to EventFilter2D adds a new method enabling a filter to be called with multiple input packets. The packets are then merged, while maintaining timestamp order, into a single packet. The scheme by which this is done is shown in Figure 2. When the packets are merged, the "source" bit of the events is tagged to indicate the index of the input packet to which the event belongs. To ensure timestamp monotonicity, it is necessary to buffer the incoming events . The reason we must buffer the incoming packets, rather than simply merge them at each iteration, is the following case: Suppose in iteration 1, packet A1 from source A comes in, with a final timestamp is $t_{A1,last}=0$, and packet B1 from source B comes in, with a final timestamp of $t_{B1,last}=3$. Lets say we merge them into packet $C1$, and proceed to process that packet. On the next iteration, packet A2 comes in, with a first timestamp of $t_{A2,first}=1$. We now realize that packet $C1$, which we released on the last iteration, contains an event with a later timestamp than the one that just came in from source A. Our filter, therefore, will be recieving non-monotonic timestamps. One approach to this is to buffer both packet sources and only release events once every buffer has recieved a later event. The problem here is that, if a source goes silent for a while, the entire system is stalled, waiting for the next event from that source. The practical solution is to specify a maximum wait-time after which, if you have not recieved any packets from a source, you mark that source as dead and move on. If the source again starts to send events, it will be added again. The filter will throw an exception if an event arrives that is delayed, relative to the other events, my more than the maximum wait time. Pseudo-code illustrating this algorithm can be found at the end of this page.

MultiInputPanel - A simple extension to FilterPanel which adds controls for the user to define the sources of the event packets which feed into their filter. The sources can be input devices or other filters.

MultiInputFrame - A simple extension to FilterFrame which allows the filter controls to exist inside the main viewing window. it links to the ProcessingNetwork, and rebuilds it when the user selects a new set of filters.

Figure 2: An example showing how event-packets flow through the processing network. Boxes indicate objects, black lines indicate the direction in which event-packets flow.

Figure 3: Class tree showing the modifications (green) to the originial jAER system (blue).

Users who would like to use multi-input filters can use the source code shown below as an example.

/**
 * This filter shows how to use the event-source bit to identify the input 
 * device in a multi-sensory filter.  It simply prints the number of events 
 * received from each source to the output window.
 * @author Peter
 */
public class SampleMultiSensoryFilter extends MultiSourceProcessor{

    public SampleMultiSensoryFilter(AEChip chip)
    {   super(chip);
    }

    @Override
    public String[] getInputNames() {
        return new String[]{"Retina","Cochlea"};
    }

    @Override
    public EventPacket<?> filterPacket(EventPacket<?> in) {

        int retEvents=0,cochEvents=0;

        for (Object o:in)
        {   if (((BasicEvent)o).source==0)
                retEvents++;
            else if (((BasicEvent)o).source==1)
                cochEvents++;   
        }
        System.err.println("Retina: "+retEvents+"\\t Cochlea: "+cochEvents);        
        return in;
    }

    @Override
    public void resetFilter() {}

    @Override
    public void initFilter() {}

}

Figure 4: A screenshot of the multi-input viewer in action. On the left, we see the control panel, with custom controls (outlined in blue) that have been added by a multi-source filter (AudioVisual Net). On the top right we see the events from the retina and cochlea, and on the bottom right we see a custom display that the AudioVisualNet filter has added.

Pseudo-Code for merging event-packets from different sources into a single packet (see real implementation in method MultiSourceProcessor.mergePackets(ArrayList<eventpacket> packets):</eventpacket>

// Initializations
lastEventTimeStamp=-Infinity
buffer[] = new list of FIFO queues
priorityQueue = new PriorityQueue

// Packet merging function
function outPacket=mergePackets(packets)

  /* Step 1: Dump all packets into buffers, define max time to which
   * we may progress */
  goToTime=-Infinity
  for (i from 0 to packets.length-1)
  | buffer[i].addEventsFrom(packets[i])
  | goToTime = max(goToTime,packets[i].lastEventTime)
  goToTime -= maxWaitTime;

  /* Step 2: if dead queues have received events, revive them, add their 
   * elements back into the priority queue. */
  if not all buffersAlive
  | for (i from 0 to buffers.length-1)
  | | if buffer[i].isNotAlive AND buffer[i].isNotEmpty
  | | | event = buffers[i].poll()
  | | | if event.timestamp>lastTimestamp
  | | | | error "Recieved timestamp from source after timeout"          
  | | | event.source=i
  | | | priorityQueue.add(event)
  | | | buffers[i].isAlive=true;

  /* Step 3: Merge buffers, generate an output packet */
  outputPacket=new EventPacket
  while (priorityQueue.isNotEmpty AND priorityQueue.firstElementTimestamp < goToTime)
  | event=priorityQueue.poll()
  | outputPacket.add(event)
  |
  | // If last-pulled-from-buffer is empty
  | if buffers[event.source].isEmpty
  | | buffers[event.source].isAlive=false
  | | break;
  | else  // Replace event polled with event from same buffer   
  | | eventPulledFromBuffer=buffers.get(ev.source).poll();        
  | | priorityQueue.add(pulledFromBuffer);

  lastTimestamp=event.timeStamp;
  return outputPacket

To Do:

  • Allow filters to display their outputs to GlobalViewer. (See GlobalViewer.addDisplayWriter(JPanel p) method). Currently, filters can manually add outputs, but the events that a filter produces cannot be directly displayed.

  • If input devices are not synchronized by a wire (in hardware) their clocks tend to drift out of sync. An option should be added to allow dynamic synchronization - something that detects a drift in timestamps and adjusts them to compensate, without breaking timestamp monotonicity.

  • Find a way to store the preferred inputs to a filter in the preference nodes so the user doesn't have to re-select them on every startup.

  • Move all the controls from AEViewer into the GlobalViewer interface. This involves separating those controls that are specific to an input device (these should go to the device window-panes) from those that are global (which should go to the top of the GlobalViewer display).

  • There's lots of optimization to be done. For instance, currently the system collects a batch of events and then processes them .. it could be made so that processing collected events and collecting new events are done concurrently,

If you have questions, you can ask Peter (poconn4 ...at... (name of google's email service) ... dot ... com).


Related

Wiki: Adding a new event filter