Menu

Integration of new components

tutorial (3)
Kristina Schaaff Lukas Platzek

Integration of new components into the xAffect framework

To follow this tutorial, it is important to learn about SystemComponents in the context of xAffect in general.
Be sure to read the articles about General information on xAffect, General information on Setups and, most importantly, General information on SystemComponents before proceeding with this article.

In this tutorial, we will build an AbstractProcessor step-by-step to demonstrate the functionality of SystemComponents. The objective will be to implement a component, which represents a sliding mean filter, taking the arithmetic mean of received data in an interval and transmitting the result to other components.

Creating an empty SystemComponent

To create a new component, we will have to create a new class, extending one of the following abstract classes:

  • AbstractSource: Typically, sources represent some kind of sensor data input, but generally they are defined not to have the possibility to receive data via xAffect, but act as a data source and offer data to other SystemComponents.
  • AbstractSink: Sinks are the exact opposite of AbstractSources. They receive data from other SystemComponents, while not being able to offer data. AbstractSinks often implement logging tasks or route data to external software.
  • AbstractProcessor: Processors combine characteristics of Sinks and Sources. While being able to receive raw data from Sources or other Processors, they can also offer manipulated data to Sinks or further Processors. A Processor is the recommended SystemComponent for all signal processing components.

For our example, the decision for the type of SystemComponent is easy: The ability to receive data, while being able to return processed data fits to an AbstractProcessor, so the created class ProcessorSlidingMean will extend the abstract class AbstractProcessor and implement its required methods:

#!java
public class ProcessorSlidingMean extends AbstractProcessor
{
    public ProcessorSlidingMean(LifeCycleManager manager, 
        HashMap<String, String> configuration) throws InstantiationException
    {
        // convert configuration into input- and outputdescription
        // configure component
    super(manager, configuration);
    }

    @Override
    public DataDescription[] getInputDescription()
    {
        // return DataDescription for incoming data
        return null;
    }

    @Override
    public DataDescription[] getOutputDescription()
    {
        // return DataDescription for outgoing data
        return null;
    }

    @Override
    protected void configure()
    {
        // set up required data-structures,
        // connect to databases, etc.
    }

    @Override
    protected void processData(DataEntry entry)
    {
        // process incoming data
    }

    @Override
    protected void train()
    {
    // indicates that the component's internal state changed to "training"
        // -> start producing data
    }

    @Override
    protected void measure()
    {
        // indicates that the component's internal state changed to "measuring"
        // -> start producing relevant data
    }

    @Override
    protected void stop()
    {
        // indicates that the component's internal state changed to "stopped"
        // free allocated memory / connections
    }
}

Construction and generation of DataDescriptions

The LifeCycleManager coordinates the Llifecycle of all components. The LifeCycleManager and and a HashMap of configurition paprameters are required as parameters to instantiate a SystemComponent. To pass the required information to the extended class, use its constructor by calling super(manager, configuration) as the first statement in the built SystemComponent's constructor.
Afterwards, every SystemComponent is supposed to define its incoming and/or outgoing data, which are returned by the methods getInputDescription() and getOutputDescription(). Note that the SystemComponent in this tutorial is an AbstractProcessor, which requires both out- and inputDescription, while AbstractSources and AbstractSinks require only one of the descriptions.
To generate the required descriptions, we are going to use the information gathered in our configuration HashMap. It has proven best-practice to define parameters given in the configuration as public static final Strings, that a setup can use those properties to set up the component's configuration.
For our example component, we need to know the names of incoming and outgoing data, their sample rate (which will be the same for incoming and outgoing data), the number of channels to calculate the mean value for, as well as the interval on which to calculate the mean value on.
This information will be transformed into two data descriptions, which will be returned by the respective methods:

#!java
// parameter for the incoming data's name
public static final String CONFIG_CONTENT_CLASS_INPUT = "CONTENTCLASSINPUT";
// parameter for the outgoing data's name
public static final String CONFIG_CONTENT_CLASS_OUTPUT = "CONTENTCLASSOUTPUT";
// parameter for the incoming and outgoing data's sample-rate
public static final String CONFIG_SAMPLE_RATE = "SAMPLERATE";
// parameter for the incoming and outgoing data's number of channels
public static final String CONFIG_NUMBER_OF_CHANNELS = "NUMBEROFCHANNELS";
// parameter defining the interval-size on which to perform the algorithm on
public static final String CONFIG_WINDOW_WIDTH = "INTERVALSIZE";

// The DataDescriptions for our component
private DataDescription outputDescription;
private DataDescription inputDescription;

// default-value for the interval
private int intervalWidth = 5;

public ProcessorSlidingMean(LifeCycleManager manager, 
    HashMap<String, String> configuration) throws InstantiationException
{
    // pass LifeCycleManager and configuration to AbstractProcessor
    super(manager, configuration);

    //check whether required parameters are set
    if (configuration.containsKey(CONFIG_CONTENT_CLASS_INPUT)
            && configuration.containsKey(CONFIG_CONTENT_CLASS_OUTPUT)
            && configuration.containsKey(CONFIG_SAMPLE_RATE)
            && configuration.containsKey(CONFIG_NUMBER_OF_CHANNELS))
    {
        try
        {
            // convert information given as String to
            // double and integer
            double sampleRate = Double.parseDouble(
                configuration.get(CONFIG_SAMPLE_RATE));
            int numberOfChannels = Integer.parseInt(
                configuration.get(CONFIG_NUMBER_OF_CHANNELS));

            // define input- and output-descriptions
            // note that this component will only be able to handle DataSignals
            this.inputDescription = new DataDescription(
                configuration.get(CONFIG_CONTENT_CLASS_INPUT), DataType.SIGNAL,
                sampleRate, numberOfChannels);

            this.outputDescription = new DataDescription(
                configuration.get(CONFIG_CONTENT_CLASS_OUTPUT), DataType.SIGNAL,
                sampleRate, numberOfChannels);
        }
        catch(NumberFormatException e)
        {
             // throw InstantiationException containing error-message
             throw new InstantiationException(
                "Error parsing configuration");
        }
    }
    else
    {
        // throw InstantiationException containing error-message
    throw new InstantiationException(
            "\"CONFIG_CONTENT_CLASS_INPUT\", "

            + "\"CONFIG_CONTENT_CLASS_OUTPUT\", "
            + "\"CONFIG_SAMPLE_RATE\" and "
            + "\"CONFIG_NUMBER_OF_CHANNELS\" are mandatory!");
    }

    // try to set intervalWidth if configured, else use default
    if (configuration.containsKey(CONFIG_WINDOW_WIDTH))
    {
    try
        {
        this.intervalWidth = Integer.parseInt(
                configuration.get(CONFIG_WINDOW_WIDTH));
    }
    catch (NumberFormatException e)
        {
            throw new InstantiationException(
                "Cannot parse \"CONFIG_WINDOW_WIDTH\" to integer.");
    }
    }
}

To provide the information on incoming and outgoing data for the dispatching-algorithm, two methods getInputDescription() and getOutputDescription() are required. For our example, we can just return the descriptions we built in the constructor:

#!java
@Override
public DataDescription[] getInputDescription()
{
    // returns the previously built inputDescription
    return new DataDescription[] { this.inputDescription };
}

@Override
public DataDescription[] getOutputDescription()
{
    // returns the previously built outputDescription
    return new DataDescription[] { this.outputDescription};
}

Configuration

After the construction of every SystemComponent, the LifeCycleManager is going to trigger the configuration of all the components in it's component-pool. Inside a SystemComponent, this becomes noticable by a call to the method configure(). During the configuration-process, components are supposed to construct data-structures, build connections to databases, etc.
For our processor, we will need a Vector that serves as a buffer for incoming data, so that we are able to perform the sliding-mean-algorithm:

#!java
// we declare the interval as private property...
private Vector<double[]> interval;

@Override
protected void configure()
{
    // ...and instantiate it in our configuration-method
    this.interval = new Vector<double[]>();
}

Processing and generating data

The method-call that usually follows configure() is train(), whih signals the transition to the component's training-state, followed by measure(), which signals the respective transition to the measuring-state. Those methods are used to differentiate between the quality of produced (or consumed) data. While incoming and outgoing data during the measuring-state is considered valueable for the studie's result, data occuring in the training-state is solely used for training-purposes of components.
For our processor, we don't have to differentiate between those classes of data, because a sliding-mean-filter always works in the same way, so we leave those methods blank.

Data-consuming components (DataProcessors and DataSinks) also require the method processData(), which contains incoming data and is called every time new data is available for the component.
For our processor, this is where the magic happens:

#!java
@Override
protected void processData(DataEntry entry)
{
    // First of all, we have to add the new incoming data entry
    // to the interval we created in configure().
    this.interval.add(((DataSignal) entry).getDataValue());

    // Afterwards, we trim the interval to contain a maximum of
    // intervalSize elements.
    while (this.interval.size() > this.intervalSize)
    {
    this.interval.remove(this.interval.firstElement());
    }

    // Now we can sum up all values for every channel...
    double[] sumValue = new double[this.numberOfChannels];
    for (double[] value : this.interval)
    {
        for (int i = 0; i < this.numberOfChannels; i++)
        {
            sumValue[i] += value[i];
        }
    }

    // ..., devide by the number of data-entries we summed up...
    double[] meanValue = new double[sumValue.length];
    for (int i = 0; i < sumValue.length; i++)
    {
        meanValue[i] = sumValue[i] / interval.size();
    }

    // ...and send the result to all SystemComponents linked to us.
    super.getSourceConnector().writeOutput(
        new DataSignal(meanValue, this.outputDescription));
}

Related

Wiki: General information on Setups
Wiki: General information on SystemComponents
Wiki: General information on xAffect
Wiki: xAffect home

MongoDB Logo MongoDB