From: <tan...@us...> - 2009-08-28 19:20:03
|
Revision: 941 http://cishell.svn.sourceforge.net/cishell/?rev=941&view=rev Author: tankchintan Date: 2009-08-28 19:19:55 +0000 (Fri, 28 Aug 2009) Log Message: ----------- Initial import of the Streaming prototype plugin. Till now code developed by Russell, Micah, Joseph. Added Paths: ----------- trunk/testing/org.cishell.streaming.prototype/.classpath trunk/testing/org.cishell.streaming.prototype/.project trunk/testing/org.cishell.streaming.prototype/.settings/ trunk/testing/org.cishell.streaming.prototype/.settings/org.eclipse.pde.core.prefs trunk/testing/org.cishell.streaming.prototype/META-INF/ trunk/testing/org.cishell.streaming.prototype/META-INF/MANIFEST.MF trunk/testing/org.cishell.streaming.prototype/OSGI-INF/ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.properties trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.xml trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/bundle_en.properties trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.properties trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.xml trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/METADATA.XML trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.properties trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.xml trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.properties trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.xml trunk/testing/org.cishell.streaming.prototype/build/ trunk/testing/org.cishell.streaming.prototype/build.properties trunk/testing/org.cishell.streaming.prototype/src/ trunk/testing/org.cishell.streaming.prototype/src/org/ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/IncreasingNumberStreamAlgorithm.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/LoggingConsumerAlgorithm.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/Printer.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/PrinterFactory.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/RandomNumberStream.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerAlgorithm.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerFactory.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ConsumerAlgorithm.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/DefaultStream.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/Stream.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/StreamAlgorithm.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/Stream.java trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/StreamImpl.java Added: trunk/testing/org.cishell.streaming.prototype/.classpath =================================================================== --- trunk/testing/org.cishell.streaming.prototype/.classpath (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/.classpath 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,7 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> + <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/> + <classpathentry kind="src" path="src"/> + <classpathentry kind="output" path="build"/> +</classpath> Added: trunk/testing/org.cishell.streaming.prototype/.project =================================================================== --- trunk/testing/org.cishell.streaming.prototype/.project (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/.project 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>org.cishell.streaming.prototype</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.pde.ManifestBuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.pde.SchemaBuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.pde.ds.core.builder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.pde.PluginNature</nature> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription> Added: trunk/testing/org.cishell.streaming.prototype/.settings/org.eclipse.pde.core.prefs =================================================================== --- trunk/testing/org.cishell.streaming.prototype/.settings/org.eclipse.pde.core.prefs (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/.settings/org.eclipse.pde.core.prefs 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,5 @@ +#Thu Aug 27 13:26:25 EDT 2009 +eclipse.preferences.version=1 +pluginProject.equinox=false +pluginProject.extensions=false +resolve.requirebundle=false Added: trunk/testing/org.cishell.streaming.prototype/META-INF/MANIFEST.MF =================================================================== --- trunk/testing/org.cishell.streaming.prototype/META-INF/MANIFEST.MF (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/META-INF/MANIFEST.MF 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,20 @@ +Manifest-Version: 1.0 +Bundle-ManifestVersion: 2 +Bundle-Name: Streaming Data Prototype +Bundle-SymbolicName: org.cishell.streaming.prototype +Bundle-Version: 0.0.1 +Bundle-ClassPath: . +Import-Package: org.cishell.app.service.datamanager, + org.cishell.app.service.scheduler, + org.cishell.framework;version="1.0.0", + org.cishell.framework.algorithm;version="1.0.0", + org.cishell.framework.data;version="1.0.0", + org.cishell.framework.userprefs;version="1.0.0", + org.osgi.framework;version="1.3.0", + org.osgi.service.cm;version="1.2.0", + org.osgi.service.component;version="1.0.0", + org.osgi.service.log;version="1.3.0", + org.osgi.service.metatype;version="1.1.0", + org.osgi.service.prefs;version="1.1.0" +X-AutoStart: true +Service-Component: OSGI-INF/increasingnumberstream.xml, OSGI-INF/loggingconsumer.xml Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.properties =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.properties (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.properties 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,7 @@ +menu_path=Streaming/start +label=Streaming Data Prototype +description=Stream data. +in_data=null +out_data=org.cishell.streaming.prototype.streamcore.Stream +service.pid=org.cishell.streaming.prototype.IncreasingNumberStreamAlgorithm +remoteable=true Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.xml =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.xml (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.xml 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,10 @@ +<?xml version="1.0" encoding="UTF-8"?> +<component name="org.cishell.streaming.prototype.IncreasingNumberStreamAlgorithm.component" immediate="false"> + <implementation class="org.cishell.streaming.prototype.IncreasingNumberStreamAlgorithm$Factory"/> + <properties entry="OSGI-INF/increasingnumberstream.properties"/> + + <service> + <provide interface= + "org.cishell.framework.algorithm.AlgorithmFactory"/> + </service> +</component> \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/bundle_en.properties =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/bundle_en.properties (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/bundle_en.properties 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,7 @@ +#Localization variables for OSGI-INF/metatatype/METADATA.XML +# +#Samples: +#input=Input +#desc=Enter an integer (that will be converted to a string) +#name=Input->String +#name_desc=Converts inputted integer to string Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.properties =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.properties (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.properties 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,7 @@ +menu_path=Streaming/start +label=Printing Data Prototype +description=Print data. +in_data=org.cishell.streaming.prototype.streamcore.Stream +out_data=null +service.pid=org.cishell.streaming.prototype.LoggingConsumerAlgorithm +remoteable=true Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.xml =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.xml (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.xml 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,10 @@ +<?xml version="1.0" encoding="UTF-8"?> +<component name="org.cishell.streaming.prototype.LoggingConsumerAlgorithm.component" immediate="false"> + <implementation class="org.cishell.streaming.prototype.LoggingConsumerAlgorithm$Factory"/> + <properties entry="OSGI-INF/loggingconsumer.properties"/> + + <service> + <provide interface= + "org.cishell.framework.algorithm.AlgorithmFactory"/> + </service> +</component> \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/METADATA.XML =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/METADATA.XML (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/METADATA.XML 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,9 @@ +<?xml version="1.0" encoding="UTF-8"?> +<metatype:MetaData xmlns:metatype="http://www.osgi.org/xmlns/metatype/v1.0.0"> + <OCD name="Streaming Data Prototype" id="org.cishell.streaming.prototype.Streamer.OCD" + description="Stream data. "> + </OCD> + <Designate pid="org.cishell.streaming.prototype.Streamer"> + <Object ocdref="org.cishell.streaming.prototype.Streamer.OCD" /> + </Designate> +</metatype:MetaData> Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.properties =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.properties (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.properties 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,7 @@ +menu_path=Streaming/start +label=Printing Data Prototype +description=Print data. +out_data=null +in_data=org.cishell.streaming.prototype.streamlib.Stream +service.pid=org.cishell.streaming.prototype.Printer +remoteable=true Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.xml =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.xml (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.xml 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,10 @@ +<?xml version="1.0" encoding="UTF-8"?> +<component name="org.cishell.streaming.prototype.Printer.component" immediate="false"> + <implementation class="org.cishell.streaming.prototype.PrinterFactory"/> + <properties entry="OSGI-INF/printer.properties"/> + + <service> + <provide interface= + "org.cishell.framework.algorithm.AlgorithmFactory"/> + </service> +</component> \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.properties =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.properties (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.properties 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,7 @@ +menu_path=Streaming/start +label=Streaming Data Prototype +description=Stream data. +in_data=null +out_data=org.cishell.streaming.prototype.streamlib.Stream +service.pid=org.cishell.streaming.prototype.Streamer +remoteable=true Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.xml =================================================================== --- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.xml (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.xml 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,10 @@ +<?xml version="1.0" encoding="UTF-8"?> +<component name="org.cishell.streaming.prototype.Streamer.component" immediate="false"> + <implementation class="org.cishell.streaming.prototype.StreamerFactory"/> + <properties entry="OSGI-INF/streamer.properties"/> + + <service> + <provide interface= + "org.cishell.framework.algorithm.AlgorithmFactory"/> + </service> +</component> \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/build.properties =================================================================== --- trunk/testing/org.cishell.streaming.prototype/build.properties (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/build.properties 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,5 @@ +source.. = src/ +output.. = build/ +bin.includes = META-INF/,\ + .,\ + OSGI-INF/ Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/IncreasingNumberStreamAlgorithm.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/IncreasingNumberStreamAlgorithm.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/IncreasingNumberStreamAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,49 @@ +package org.cishell.streaming.prototype; + +import java.util.Dictionary; + +import org.cishell.framework.CIShellContext; +import org.cishell.framework.algorithm.Algorithm; +import org.cishell.framework.algorithm.AlgorithmFactory; +import org.cishell.framework.data.Data; +import org.cishell.streaming.prototype.streamcore.DefaultStream; +import org.cishell.streaming.prototype.streamcore.Stream; +import org.cishell.streaming.prototype.streamcore.StreamAlgorithm; + +public class IncreasingNumberStreamAlgorithm extends StreamAlgorithm<Integer> { + @SuppressWarnings("unchecked") // TODO + public IncreasingNumberStreamAlgorithm(Data[] data, Dictionary parameters, + CIShellContext context) { + super(data, parameters, context); + } + + @Override + protected Stream<Integer> createStream() { + return new IncreasingNumberStream(); + } + + private class IncreasingNumberStream extends DefaultStream<Integer> { + public static final int MAX_EMISSIONS = 20; + + private int numEmissions = 0; + + public Integer next() { + // TODO Debug only + System.out.println("About to produce " + (numEmissions)); + return numEmissions++; + } + + public boolean isFinished() { + return numEmissions == MAX_EMISSIONS; + } + } + + public static class Factory implements AlgorithmFactory { + @SuppressWarnings("unchecked") // TODO + public Algorithm createAlgorithm(Data[] data, Dictionary parameters, + CIShellContext context) { + return new IncreasingNumberStreamAlgorithm( + data, parameters, context); + } + } +} Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/LoggingConsumerAlgorithm.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/LoggingConsumerAlgorithm.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/LoggingConsumerAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,39 @@ +package org.cishell.streaming.prototype; + +import java.util.Dictionary; + +import org.cishell.framework.CIShellContext; +import org.cishell.framework.algorithm.Algorithm; +import org.cishell.framework.algorithm.AlgorithmFactory; +import org.cishell.framework.data.Data; +import org.cishell.streaming.prototype.streamcore.ConsumerAlgorithm; +import org.osgi.service.log.LogService; + +public class LoggingConsumerAlgorithm extends ConsumerAlgorithm<Object> { + private LogService logger; + + + @SuppressWarnings("unchecked") // TODO + public LoggingConsumerAlgorithm( + Data[] data, Dictionary parameters, CIShellContext context) { + super(data, parameters, context); + + this.logger = + (LogService) context.getService(LogService.class.getName()); + } + + + @Override + public void consume(Object value) { + logger.log(LogService.LOG_INFO, "Consuming " + value); + } + + + public static class Factory implements AlgorithmFactory { + @SuppressWarnings("unchecked") // TODO + public Algorithm createAlgorithm( + Data[] data, Dictionary parameters, CIShellContext context) { + return new LoggingConsumerAlgorithm(data, parameters, context); + } + } +} Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/Printer.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/Printer.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/Printer.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,38 @@ +package org.cishell.streaming.prototype; + +import java.util.Dictionary; + +import org.cishell.framework.CIShellContext; +import org.cishell.framework.algorithm.Algorithm; +import org.cishell.framework.algorithm.AlgorithmExecutionException; +import org.cishell.framework.data.Data; +import org.cishell.streaming.prototype.streamlib.Stream; +import org.osgi.service.log.LogService; + +public class Printer implements Algorithm { + + private Stream<String> stream; + private LogService logger; + + + public Printer(Data[] data, Dictionary parameters, CIShellContext context) { + this.stream = (Stream<String>) data[0].getData(); + this.logger = (LogService) context.getService(LogService.class.getName()); + } + + public Data[] execute() throws AlgorithmExecutionException { + + int nextToPrint = 0; + while(!stream.isFinalEndpoint(nextToPrint)) { + int endpoint = stream.getCurrentEndpoint(); + for(int ii = nextToPrint; ii < endpoint; ii++) { + logger.log(LogService.LOG_INFO, "Received: " + stream.getValueAtTimestep(ii)); + } + nextToPrint = endpoint; + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + } + return null; + } +} \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/PrinterFactory.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/PrinterFactory.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/PrinterFactory.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,14 @@ +package org.cishell.streaming.prototype; + +import java.util.Dictionary; + +import org.cishell.framework.CIShellContext; +import org.cishell.framework.algorithm.Algorithm; +import org.cishell.framework.algorithm.AlgorithmFactory; +import org.cishell.framework.data.Data; + +public class PrinterFactory implements AlgorithmFactory { + public Algorithm createAlgorithm(Data[] data, Dictionary parameters, CIShellContext context) { + return new Printer(data, parameters, context); + } +} \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/RandomNumberStream.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/RandomNumberStream.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/RandomNumberStream.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,19 @@ +package org.cishell.streaming.prototype; + +import org.cishell.streaming.prototype.streamlib.StreamImpl; + +public class RandomNumberStream extends StreamImpl<String> { + + public static final int MAX_EMISSIONS = 20; + + private int numEmissions = 0; + + public String yield() { + numEmissions++; + return "Generated " + Math.random() + " | Time step ID: " + numEmissions; + } + + public boolean isFinished() { + return numEmissions == MAX_EMISSIONS; + } +} Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerAlgorithm.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerAlgorithm.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,28 @@ +package org.cishell.streaming.prototype; + +import java.util.Dictionary; + +import org.cishell.framework.CIShellContext; +import org.cishell.framework.algorithm.Algorithm; +import org.cishell.framework.algorithm.AlgorithmExecutionException; +import org.cishell.framework.data.BasicData; +import org.cishell.framework.data.Data; +import org.cishell.streaming.prototype.streamlib.StreamImpl; + +public class StreamerAlgorithm implements Algorithm { + private Data[] data; + private Dictionary parameters; + private CIShellContext context; + + public StreamerAlgorithm(Data[] data, Dictionary parameters, CIShellContext context) { + this.data = data; + this.parameters = parameters; + this.context = context; + } + + public Data[] execute() throws AlgorithmExecutionException { + Data[] output = new Data[]{new BasicData(new RandomNumberStream(), StreamImpl.class.getName())}; + + return output; + } +} \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerFactory.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerFactory.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerFactory.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,14 @@ +package org.cishell.streaming.prototype; + +import java.util.Dictionary; + +import org.cishell.framework.CIShellContext; +import org.cishell.framework.algorithm.Algorithm; +import org.cishell.framework.algorithm.AlgorithmFactory; +import org.cishell.framework.data.Data; + +public class StreamerFactory implements AlgorithmFactory { + public Algorithm createAlgorithm(Data[] data, Dictionary parameters, CIShellContext context) { + return new StreamerAlgorithm(data, parameters, context); + } +} \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ConsumerAlgorithm.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ConsumerAlgorithm.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ConsumerAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,47 @@ +package org.cishell.streaming.prototype.streamcore; + +import java.util.Dictionary; + +import org.cishell.framework.CIShellContext; +import org.cishell.framework.algorithm.Algorithm; +import org.cishell.framework.algorithm.AlgorithmExecutionException; +import org.cishell.framework.data.Data; + +public abstract class ConsumerAlgorithm<T> implements Algorithm { + private Stream<T> stream; + + + @SuppressWarnings("unchecked") // TODO + public ConsumerAlgorithm(Data[] data, Dictionary parameters, + CIShellContext context) { + this.stream = (Stream<T>) data[0].getData(); + } + + + public Data[] execute() throws AlgorithmExecutionException { + int nextToPrint = 0; + + while (!stream.isFinalEndpoint(nextToPrint)) { + int endpoint = stream.getCurrentEndpoint(); + + for (int ii = nextToPrint; ii < endpoint; ii++) { + // TODO Debug only + System.out.println("About to consume " + + stream.getValueAtTimestep(ii) + + " from timestep " + + ii); + consume(stream.getValueAtTimestep(ii)); + } + + nextToPrint = endpoint; + + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + } + + return null; + } + + public abstract void consume(T value); +} \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/DefaultStream.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/DefaultStream.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/DefaultStream.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,85 @@ +package org.cishell.streaming.prototype.streamcore; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public abstract class DefaultStream<T> implements Stream<T> { + private static final long DEFAULT_SLEEP_TIME = 1000; + + private List<T> dataList = + Collections.synchronizedList(new ArrayList<T>()); + private Thread streamingThread; + + private boolean paused; + + private boolean stopped; + + + public DefaultStream() { + this(DEFAULT_SLEEP_TIME); + } + + public DefaultStream(final long sleepTime) { + this.streamingThread = new Thread(new Runnable() { + public void run() { + while(!isStopped() && !isFinished()) { + while (isPaused()) { + // TODO Debug only + System.out.println("Still paused!"); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) {} + } + + dataList.add(next()); + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) {} + } + } + }); + + streamingThread.start(); + } + + public void pause() { + setPaused(true); + } + public void unpause() { + setPaused(false); + } + public boolean isPaused() { + return paused; + } + public void setPaused(boolean paused) { + this.paused = paused; + } + + public void stop() { + setStopped(true); + } + public boolean isStopped() { + return stopped; + } + public void setStopped(boolean stopped) { + this.stopped = stopped; + } + + public abstract boolean isFinished(); + + + public int getCurrentEndpoint() { + return dataList.size(); + } + + public T getValueAtTimestep(int timestep) { + return dataList.get(timestep); + } + + public boolean isFinalEndpoint(int candidateEndpoint) { + return ((!streamingThread.isAlive()) + && (candidateEndpoint == getCurrentEndpoint())); + } +} Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/Stream.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/Stream.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/Stream.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,12 @@ +package org.cishell.streaming.prototype.streamcore; + +public interface Stream<T> { + public T next(); + public void pause(); + public void unpause(); + public void stop(); + + public int getCurrentEndpoint(); + public T getValueAtTimestep(int timestep); + public boolean isFinalEndpoint(int candidateEndpoint); +} Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/StreamAlgorithm.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/StreamAlgorithm.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/StreamAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,79 @@ +package org.cishell.streaming.prototype.streamcore; + +import java.util.Calendar; +import java.util.Dictionary; + +import org.cishell.app.service.datamanager.DataManagerListener; +import org.cishell.app.service.datamanager.DataManagerService; +import org.cishell.app.service.scheduler.SchedulerListener; +import org.cishell.app.service.scheduler.SchedulerService; +import org.cishell.framework.CIShellContext; +import org.cishell.framework.algorithm.Algorithm; +import org.cishell.framework.algorithm.AlgorithmExecutionException; +import org.cishell.framework.data.BasicData; +import org.cishell.framework.data.Data; + +public abstract class StreamAlgorithm<T> + implements Algorithm, DataManagerListener, SchedulerListener { + private Data outStreamContainer; + private Stream<T> stream; + private DataManagerService dataManager; + private SchedulerService scheduler; + + @SuppressWarnings("unchecked") + public StreamAlgorithm( + Data[] data, Dictionary parameters, CIShellContext context) { + this.dataManager = + (DataManagerService) context.getService( + DataManagerService.class.getName()); + dataManager.addDataManagerListener(this); + + this.scheduler = + (SchedulerService) context.getService( + SchedulerService.class.getName()); + scheduler.addSchedulerListener(this); + } + + + public Data[] execute() throws AlgorithmExecutionException { + this.stream = createStream(); + this.outStreamContainer = + new BasicData(stream, Stream.class.getName()); + return new Data[]{ outStreamContainer }; + } + + protected abstract Stream<T> createStream(); + + + // *** DataManagerListener (TODO: Implement as appropriate) + public void dataAdded(Data data, String label) {} + public void dataLabelChanged(Data data, String label) {} + public void dataSelected(Data[] data) {} + public void dataRemoved(Data data) { + if (outStreamContainer.equals(data)) { + stream.stop(); + dataManager.removeDataManagerListener(this); + scheduler.removeSchedulerListener(this); + } + } + // *** DataManagerListener (TODO: Implement as appropriate) + + + // *** SchedulerListener (TODO: Implement as appropriate) + public void algorithmScheduled(Algorithm algorithm, Calendar time) {} + public void algorithmRescheduled(Algorithm algorithm, Calendar time) {} + public void algorithmUnscheduled(Algorithm algorithm) {} + public void algorithmStarted(Algorithm algorithm) {} + public void algorithmFinished(Algorithm algorithm, Data[] createdData) {} + public void algorithmError(Algorithm algorithm, Throwable error) {} + public void schedulerRunStateChanged(boolean isRunning) { + System.out.println("Scheduler run state changed!"); + if (isRunning) { + stream.unpause(); + } else { + stream.pause(); + } + } + public void schedulerCleared() {} + // *** SchedulerListener (TODO: Implement as appropriate) +} \ No newline at end of file Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/Stream.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/Stream.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/Stream.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,11 @@ +package org.cishell.streaming.prototype.streamlib; + +public interface Stream<T> { + + public abstract int getCurrentEndpoint(); + + public abstract T getValueAtTimestep(int timestep); + + public abstract boolean isFinalEndpoint(int candidateEndpoint); + +} Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/StreamImpl.java =================================================================== --- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/StreamImpl.java (rev 0) +++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/StreamImpl.java 2009-08-28 19:19:55 UTC (rev 941) @@ -0,0 +1,57 @@ +package org.cishell.streaming.prototype.streamlib; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public abstract class StreamImpl<T> implements Stream<T> { + + private static final long DEFAULT_SLEEP_TIME = 1000; + + private List<T> streamingList = Collections.synchronizedList(new ArrayList<T>()); + private Thread streamingThread; + + public StreamImpl() { + this(DEFAULT_SLEEP_TIME); + } + + public StreamImpl(final long sleepTime) { + this.streamingThread = new Thread(new Runnable(){ + public void run() { + while(! StreamImpl.this.isFinished()) { + StreamImpl.this.streamingList.add(StreamImpl.this.yield()); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) {} + } + } + }); + + streamingThread.start(); + } + + + public abstract boolean isFinished(); + public abstract T yield(); + + /* (non-Javadoc) + * @see org.cishell.streaming.prototype.streamlib.Streamo#getCurrentEndpoint() + */ + public int getCurrentEndpoint() { + return streamingList.size(); + } + + /* (non-Javadoc) + * @see org.cishell.streaming.prototype.streamlib.Streamo#getValueAtTimestep(int) + */ + public T getValueAtTimestep(int timestep) { + return streamingList.get(timestep); + } + + /* (non-Javadoc) + * @see org.cishell.streaming.prototype.streamlib.Streamo#isFinalEndpoint(int) + */ + public boolean isFinalEndpoint(int candidateEndpoint) { + return !streamingThread.isAlive() && candidateEndpoint == getCurrentEndpoint(); + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |