To follow this tutorial, it is important to learn about SystemComponents in the context of xAffect in general.
Be sure to read the articles about General information on xAffect, General information on Setups and, most importantly, General information on SystemComponents before proceeding with this article.
In this tutorial, we will build an AbstractProcessor step-by-step to demonstrate the functionality of SystemComponents. The objective will be to implement a component, which represents a sliding mean filter, taking the arithmetic mean of received data in an interval and transmitting the result to other components.
To create a new component, we will have to create a new class, extending one of the following abstract classes:
For our example, the decision for the type of SystemComponent is easy: The ability to receive data, while being able to return processed data fits to an AbstractProcessor, so the created class ProcessorSlidingMean will extend the abstract class AbstractProcessor and implement its required methods:
#!java
public class ProcessorSlidingMean extends AbstractProcessor
{
public ProcessorSlidingMean(LifeCycleManager manager,
HashMap<String, String> configuration) throws InstantiationException
{
// convert configuration into input- and outputdescription
// configure component
super(manager, configuration);
}
@Override
public DataDescription[] getInputDescription()
{
// return DataDescription for incoming data
return null;
}
@Override
public DataDescription[] getOutputDescription()
{
// return DataDescription for outgoing data
return null;
}
@Override
protected void configure()
{
// set up required data-structures,
// connect to databases, etc.
}
@Override
protected void processData(DataEntry entry)
{
// process incoming data
}
@Override
protected void train()
{
// indicates that the component's internal state changed to "training"
// -> start producing data
}
@Override
protected void measure()
{
// indicates that the component's internal state changed to "measuring"
// -> start producing relevant data
}
@Override
protected void stop()
{
// indicates that the component's internal state changed to "stopped"
// free allocated memory / connections
}
}
The LifeCycleManager coordinates the Llifecycle of all components. The LifeCycleManager and and a HashMap of configurition paprameters are required as parameters to instantiate a SystemComponent. To pass the required information to the extended class, use its constructor by calling super(manager, configuration) as the first statement in the built SystemComponent's constructor.
Afterwards, every SystemComponent is supposed to define its incoming and/or outgoing data, which are returned by the methods getInputDescription() and getOutputDescription(). Note that the SystemComponent in this tutorial is an AbstractProcessor, which requires both out- and inputDescription, while AbstractSources and AbstractSinks require only one of the descriptions.
To generate the required descriptions, we are going to use the information gathered in our configuration HashMap. It has proven best-practice to define parameters given in the configuration as public static final Strings, that a setup can use those properties to set up the component's configuration.
For our example component, we need to know the names of incoming and outgoing data, their sample rate (which will be the same for incoming and outgoing data), the number of channels to calculate the mean value for, as well as the interval on which to calculate the mean value on.
This information will be transformed into two data descriptions, which will be returned by the respective methods:
#!java
// parameter for the incoming data's name
public static final String CONFIG_CONTENT_CLASS_INPUT = "CONTENTCLASSINPUT";
// parameter for the outgoing data's name
public static final String CONFIG_CONTENT_CLASS_OUTPUT = "CONTENTCLASSOUTPUT";
// parameter for the incoming and outgoing data's sample-rate
public static final String CONFIG_SAMPLE_RATE = "SAMPLERATE";
// parameter for the incoming and outgoing data's number of channels
public static final String CONFIG_NUMBER_OF_CHANNELS = "NUMBEROFCHANNELS";
// parameter defining the interval-size on which to perform the algorithm on
public static final String CONFIG_WINDOW_WIDTH = "INTERVALSIZE";
// The DataDescriptions for our component
private DataDescription outputDescription;
private DataDescription inputDescription;
// default-value for the interval
private int intervalWidth = 5;
public ProcessorSlidingMean(LifeCycleManager manager,
HashMap<String, String> configuration) throws InstantiationException
{
// pass LifeCycleManager and configuration to AbstractProcessor
super(manager, configuration);
//check whether required parameters are set
if (configuration.containsKey(CONFIG_CONTENT_CLASS_INPUT)
&& configuration.containsKey(CONFIG_CONTENT_CLASS_OUTPUT)
&& configuration.containsKey(CONFIG_SAMPLE_RATE)
&& configuration.containsKey(CONFIG_NUMBER_OF_CHANNELS))
{
try
{
// convert information given as String to
// double and integer
double sampleRate = Double.parseDouble(
configuration.get(CONFIG_SAMPLE_RATE));
int numberOfChannels = Integer.parseInt(
configuration.get(CONFIG_NUMBER_OF_CHANNELS));
// define input- and output-descriptions
// note that this component will only be able to handle DataSignals
this.inputDescription = new DataDescription(
configuration.get(CONFIG_CONTENT_CLASS_INPUT), DataType.SIGNAL,
sampleRate, numberOfChannels);
this.outputDescription = new DataDescription(
configuration.get(CONFIG_CONTENT_CLASS_OUTPUT), DataType.SIGNAL,
sampleRate, numberOfChannels);
}
catch(NumberFormatException e)
{
// throw InstantiationException containing error-message
throw new InstantiationException(
"Error parsing configuration");
}
}
else
{
// throw InstantiationException containing error-message
throw new InstantiationException(
"\"CONFIG_CONTENT_CLASS_INPUT\", "
+ "\"CONFIG_CONTENT_CLASS_OUTPUT\", "
+ "\"CONFIG_SAMPLE_RATE\" and "
+ "\"CONFIG_NUMBER_OF_CHANNELS\" are mandatory!");
}
// try to set intervalWidth if configured, else use default
if (configuration.containsKey(CONFIG_WINDOW_WIDTH))
{
try
{
this.intervalWidth = Integer.parseInt(
configuration.get(CONFIG_WINDOW_WIDTH));
}
catch (NumberFormatException e)
{
throw new InstantiationException(
"Cannot parse \"CONFIG_WINDOW_WIDTH\" to integer.");
}
}
}
To provide the information on incoming and outgoing data for the dispatching-algorithm, two methods getInputDescription() and getOutputDescription() are required. For our example, we can just return the descriptions we built in the constructor:
#!java
@Override
public DataDescription[] getInputDescription()
{
// returns the previously built inputDescription
return new DataDescription[] { this.inputDescription };
}
@Override
public DataDescription[] getOutputDescription()
{
// returns the previously built outputDescription
return new DataDescription[] { this.outputDescription};
}
After the construction of every SystemComponent, the LifeCycleManager is going to trigger the configuration of all the components in it's component-pool. Inside a SystemComponent, this becomes noticable by a call to the method configure(). During the configuration-process, components are supposed to construct data-structures, build connections to databases, etc.
For our processor, we will need a Vector that serves as a buffer for incoming data, so that we are able to perform the sliding-mean-algorithm:
#!java
// we declare the interval as private property...
private Vector<double[]> interval;
@Override
protected void configure()
{
// ...and instantiate it in our configuration-method
this.interval = new Vector<double[]>();
}
The method-call that usually follows configure() is train(), whih signals the transition to the component's training-state, followed by measure(), which signals the respective transition to the measuring-state. Those methods are used to differentiate between the quality of produced (or consumed) data. While incoming and outgoing data during the measuring-state is considered valueable for the studie's result, data occuring in the training-state is solely used for training-purposes of components.
For our processor, we don't have to differentiate between those classes of data, because a sliding-mean-filter always works in the same way, so we leave those methods blank.
Data-consuming components (DataProcessors and DataSinks) also require the method processData(), which contains incoming data and is called every time new data is available for the component.
For our processor, this is where the magic happens:
#!java
@Override
protected void processData(DataEntry entry)
{
// First of all, we have to add the new incoming data entry
// to the interval we created in configure().
this.interval.add(((DataSignal) entry).getDataValue());
// Afterwards, we trim the interval to contain a maximum of
// intervalSize elements.
while (this.interval.size() > this.intervalSize)
{
this.interval.remove(this.interval.firstElement());
}
// Now we can sum up all values for every channel...
double[] sumValue = new double[this.numberOfChannels];
for (double[] value : this.interval)
{
for (int i = 0; i < this.numberOfChannels; i++)
{
sumValue[i] += value[i];
}
}
// ..., devide by the number of data-entries we summed up...
double[] meanValue = new double[sumValue.length];
for (int i = 0; i < sumValue.length; i++)
{
meanValue[i] = sumValue[i] / interval.size();
}
// ...and send the result to all SystemComponents linked to us.
super.getSourceConnector().writeOutput(
new DataSignal(meanValue, this.outputDescription));
}
Wiki: General information on Setups
Wiki: General information on SystemComponents
Wiki: General information on xAffect
Wiki: xAffect home