|
From: <tan...@us...> - 2009-08-28 19:20:03
|
Revision: 941
http://cishell.svn.sourceforge.net/cishell/?rev=941&view=rev
Author: tankchintan
Date: 2009-08-28 19:19:55 +0000 (Fri, 28 Aug 2009)
Log Message:
-----------
Initial import of the Streaming prototype plugin. Till now code developed by Russell, Micah, Joseph.
Added Paths:
-----------
trunk/testing/org.cishell.streaming.prototype/.classpath
trunk/testing/org.cishell.streaming.prototype/.project
trunk/testing/org.cishell.streaming.prototype/.settings/
trunk/testing/org.cishell.streaming.prototype/.settings/org.eclipse.pde.core.prefs
trunk/testing/org.cishell.streaming.prototype/META-INF/
trunk/testing/org.cishell.streaming.prototype/META-INF/MANIFEST.MF
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.properties
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.xml
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/bundle_en.properties
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.properties
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.xml
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/METADATA.XML
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.properties
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.xml
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.properties
trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.xml
trunk/testing/org.cishell.streaming.prototype/build/
trunk/testing/org.cishell.streaming.prototype/build.properties
trunk/testing/org.cishell.streaming.prototype/src/
trunk/testing/org.cishell.streaming.prototype/src/org/
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/IncreasingNumberStreamAlgorithm.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/LoggingConsumerAlgorithm.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/Printer.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/PrinterFactory.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/RandomNumberStream.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerAlgorithm.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerFactory.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ConsumerAlgorithm.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/DefaultStream.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/Stream.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/StreamAlgorithm.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/Stream.java
trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/StreamImpl.java
Added: trunk/testing/org.cishell.streaming.prototype/.classpath
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/.classpath (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/.classpath 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="output" path="build"/>
+</classpath>
Added: trunk/testing/org.cishell.streaming.prototype/.project
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/.project (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/.project 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>org.cishell.streaming.prototype</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.ManifestBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.SchemaBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.ds.core.builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.pde.PluginNature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
Added: trunk/testing/org.cishell.streaming.prototype/.settings/org.eclipse.pde.core.prefs
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/.settings/org.eclipse.pde.core.prefs (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/.settings/org.eclipse.pde.core.prefs 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,5 @@
+#Thu Aug 27 13:26:25 EDT 2009
+eclipse.preferences.version=1
+pluginProject.equinox=false
+pluginProject.extensions=false
+resolve.requirebundle=false
Added: trunk/testing/org.cishell.streaming.prototype/META-INF/MANIFEST.MF
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/META-INF/MANIFEST.MF (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/META-INF/MANIFEST.MF 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,20 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Streaming Data Prototype
+Bundle-SymbolicName: org.cishell.streaming.prototype
+Bundle-Version: 0.0.1
+Bundle-ClassPath: .
+Import-Package: org.cishell.app.service.datamanager,
+ org.cishell.app.service.scheduler,
+ org.cishell.framework;version="1.0.0",
+ org.cishell.framework.algorithm;version="1.0.0",
+ org.cishell.framework.data;version="1.0.0",
+ org.cishell.framework.userprefs;version="1.0.0",
+ org.osgi.framework;version="1.3.0",
+ org.osgi.service.cm;version="1.2.0",
+ org.osgi.service.component;version="1.0.0",
+ org.osgi.service.log;version="1.3.0",
+ org.osgi.service.metatype;version="1.1.0",
+ org.osgi.service.prefs;version="1.1.0"
+X-AutoStart: true
+Service-Component: OSGI-INF/increasingnumberstream.xml, OSGI-INF/loggingconsumer.xml
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.properties
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.properties (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.properties 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,7 @@
+menu_path=Streaming/start
+label=Streaming Data Prototype
+description=Stream data.
+in_data=null
+out_data=org.cishell.streaming.prototype.streamcore.Stream
+service.pid=org.cishell.streaming.prototype.IncreasingNumberStreamAlgorithm
+remoteable=true
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.xml
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.xml (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/increasingnumberstream.xml 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<component name="org.cishell.streaming.prototype.IncreasingNumberStreamAlgorithm.component" immediate="false">
+ <implementation class="org.cishell.streaming.prototype.IncreasingNumberStreamAlgorithm$Factory"/>
+ <properties entry="OSGI-INF/increasingnumberstream.properties"/>
+
+ <service>
+ <provide interface=
+ "org.cishell.framework.algorithm.AlgorithmFactory"/>
+ </service>
+</component>
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/bundle_en.properties
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/bundle_en.properties (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/l10n/bundle_en.properties 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,7 @@
+#Localization variables for OSGI-INF/metatatype/METADATA.XML
+#
+#Samples:
+#input=Input
+#desc=Enter an integer (that will be converted to a string)
+#name=Input->String
+#name_desc=Converts inputted integer to string
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.properties
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.properties (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.properties 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,7 @@
+menu_path=Streaming/start
+label=Printing Data Prototype
+description=Print data.
+in_data=org.cishell.streaming.prototype.streamcore.Stream
+out_data=null
+service.pid=org.cishell.streaming.prototype.LoggingConsumerAlgorithm
+remoteable=true
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.xml
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.xml (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/loggingconsumer.xml 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<component name="org.cishell.streaming.prototype.LoggingConsumerAlgorithm.component" immediate="false">
+ <implementation class="org.cishell.streaming.prototype.LoggingConsumerAlgorithm$Factory"/>
+ <properties entry="OSGI-INF/loggingconsumer.properties"/>
+
+ <service>
+ <provide interface=
+ "org.cishell.framework.algorithm.AlgorithmFactory"/>
+ </service>
+</component>
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/METADATA.XML
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/METADATA.XML (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/metatype/METADATA.XML 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<metatype:MetaData xmlns:metatype="http://www.osgi.org/xmlns/metatype/v1.0.0">
+ <OCD name="Streaming Data Prototype" id="org.cishell.streaming.prototype.Streamer.OCD"
+ description="Stream data. ">
+ </OCD>
+ <Designate pid="org.cishell.streaming.prototype.Streamer">
+ <Object ocdref="org.cishell.streaming.prototype.Streamer.OCD" />
+ </Designate>
+</metatype:MetaData>
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.properties
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.properties (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.properties 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,7 @@
+menu_path=Streaming/start
+label=Printing Data Prototype
+description=Print data.
+out_data=null
+in_data=org.cishell.streaming.prototype.streamlib.Stream
+service.pid=org.cishell.streaming.prototype.Printer
+remoteable=true
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.xml
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.xml (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/printer.xml 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<component name="org.cishell.streaming.prototype.Printer.component" immediate="false">
+ <implementation class="org.cishell.streaming.prototype.PrinterFactory"/>
+ <properties entry="OSGI-INF/printer.properties"/>
+
+ <service>
+ <provide interface=
+ "org.cishell.framework.algorithm.AlgorithmFactory"/>
+ </service>
+</component>
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.properties
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.properties (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.properties 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,7 @@
+menu_path=Streaming/start
+label=Streaming Data Prototype
+description=Stream data.
+in_data=null
+out_data=org.cishell.streaming.prototype.streamlib.Stream
+service.pid=org.cishell.streaming.prototype.Streamer
+remoteable=true
Added: trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.xml
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.xml (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/OSGI-INF/streamer.xml 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<component name="org.cishell.streaming.prototype.Streamer.component" immediate="false">
+ <implementation class="org.cishell.streaming.prototype.StreamerFactory"/>
+ <properties entry="OSGI-INF/streamer.properties"/>
+
+ <service>
+ <provide interface=
+ "org.cishell.framework.algorithm.AlgorithmFactory"/>
+ </service>
+</component>
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/build.properties
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/build.properties (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/build.properties 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,5 @@
+source.. = src/
+output.. = build/
+bin.includes = META-INF/,\
+ .,\
+ OSGI-INF/
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/IncreasingNumberStreamAlgorithm.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/IncreasingNumberStreamAlgorithm.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/IncreasingNumberStreamAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,49 @@
+package org.cishell.streaming.prototype;
+
+import java.util.Dictionary;
+
+import org.cishell.framework.CIShellContext;
+import org.cishell.framework.algorithm.Algorithm;
+import org.cishell.framework.algorithm.AlgorithmFactory;
+import org.cishell.framework.data.Data;
+import org.cishell.streaming.prototype.streamcore.DefaultStream;
+import org.cishell.streaming.prototype.streamcore.Stream;
+import org.cishell.streaming.prototype.streamcore.StreamAlgorithm;
+
+public class IncreasingNumberStreamAlgorithm extends StreamAlgorithm<Integer> {
+ @SuppressWarnings("unchecked") // TODO
+ public IncreasingNumberStreamAlgorithm(Data[] data, Dictionary parameters,
+ CIShellContext context) {
+ super(data, parameters, context);
+ }
+
+ @Override
+ protected Stream<Integer> createStream() {
+ return new IncreasingNumberStream();
+ }
+
+ private class IncreasingNumberStream extends DefaultStream<Integer> {
+ public static final int MAX_EMISSIONS = 20;
+
+ private int numEmissions = 0;
+
+ public Integer next() {
+ // TODO Debug only
+ System.out.println("About to produce " + (numEmissions));
+ return numEmissions++;
+ }
+
+ public boolean isFinished() {
+ return numEmissions == MAX_EMISSIONS;
+ }
+ }
+
+ public static class Factory implements AlgorithmFactory {
+ @SuppressWarnings("unchecked") // TODO
+ public Algorithm createAlgorithm(Data[] data, Dictionary parameters,
+ CIShellContext context) {
+ return new IncreasingNumberStreamAlgorithm(
+ data, parameters, context);
+ }
+ }
+}
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/LoggingConsumerAlgorithm.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/LoggingConsumerAlgorithm.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/LoggingConsumerAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,39 @@
+package org.cishell.streaming.prototype;
+
+import java.util.Dictionary;
+
+import org.cishell.framework.CIShellContext;
+import org.cishell.framework.algorithm.Algorithm;
+import org.cishell.framework.algorithm.AlgorithmFactory;
+import org.cishell.framework.data.Data;
+import org.cishell.streaming.prototype.streamcore.ConsumerAlgorithm;
+import org.osgi.service.log.LogService;
+
+public class LoggingConsumerAlgorithm extends ConsumerAlgorithm<Object> {
+ private LogService logger;
+
+
+ @SuppressWarnings("unchecked") // TODO
+ public LoggingConsumerAlgorithm(
+ Data[] data, Dictionary parameters, CIShellContext context) {
+ super(data, parameters, context);
+
+ this.logger =
+ (LogService) context.getService(LogService.class.getName());
+ }
+
+
+ @Override
+ public void consume(Object value) {
+ logger.log(LogService.LOG_INFO, "Consuming " + value);
+ }
+
+
+ public static class Factory implements AlgorithmFactory {
+ @SuppressWarnings("unchecked") // TODO
+ public Algorithm createAlgorithm(
+ Data[] data, Dictionary parameters, CIShellContext context) {
+ return new LoggingConsumerAlgorithm(data, parameters, context);
+ }
+ }
+}
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/Printer.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/Printer.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/Printer.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,38 @@
+package org.cishell.streaming.prototype;
+
+import java.util.Dictionary;
+
+import org.cishell.framework.CIShellContext;
+import org.cishell.framework.algorithm.Algorithm;
+import org.cishell.framework.algorithm.AlgorithmExecutionException;
+import org.cishell.framework.data.Data;
+import org.cishell.streaming.prototype.streamlib.Stream;
+import org.osgi.service.log.LogService;
+
+public class Printer implements Algorithm {
+
+ private Stream<String> stream;
+ private LogService logger;
+
+
+ public Printer(Data[] data, Dictionary parameters, CIShellContext context) {
+ this.stream = (Stream<String>) data[0].getData();
+ this.logger = (LogService) context.getService(LogService.class.getName());
+ }
+
+ public Data[] execute() throws AlgorithmExecutionException {
+
+ int nextToPrint = 0;
+ while(!stream.isFinalEndpoint(nextToPrint)) {
+ int endpoint = stream.getCurrentEndpoint();
+ for(int ii = nextToPrint; ii < endpoint; ii++) {
+ logger.log(LogService.LOG_INFO, "Received: " + stream.getValueAtTimestep(ii));
+ }
+ nextToPrint = endpoint;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {}
+ }
+ return null;
+ }
+}
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/PrinterFactory.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/PrinterFactory.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/PrinterFactory.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,14 @@
+package org.cishell.streaming.prototype;
+
+import java.util.Dictionary;
+
+import org.cishell.framework.CIShellContext;
+import org.cishell.framework.algorithm.Algorithm;
+import org.cishell.framework.algorithm.AlgorithmFactory;
+import org.cishell.framework.data.Data;
+
+public class PrinterFactory implements AlgorithmFactory {
+ public Algorithm createAlgorithm(Data[] data, Dictionary parameters, CIShellContext context) {
+ return new Printer(data, parameters, context);
+ }
+}
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/RandomNumberStream.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/RandomNumberStream.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/RandomNumberStream.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,19 @@
+package org.cishell.streaming.prototype;
+
+import org.cishell.streaming.prototype.streamlib.StreamImpl;
+
+public class RandomNumberStream extends StreamImpl<String> {
+
+ public static final int MAX_EMISSIONS = 20;
+
+ private int numEmissions = 0;
+
+ public String yield() {
+ numEmissions++;
+ return "Generated " + Math.random() + " | Time step ID: " + numEmissions;
+ }
+
+ public boolean isFinished() {
+ return numEmissions == MAX_EMISSIONS;
+ }
+}
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerAlgorithm.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerAlgorithm.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,28 @@
+package org.cishell.streaming.prototype;
+
+import java.util.Dictionary;
+
+import org.cishell.framework.CIShellContext;
+import org.cishell.framework.algorithm.Algorithm;
+import org.cishell.framework.algorithm.AlgorithmExecutionException;
+import org.cishell.framework.data.BasicData;
+import org.cishell.framework.data.Data;
+import org.cishell.streaming.prototype.streamlib.StreamImpl;
+
+public class StreamerAlgorithm implements Algorithm {
+ private Data[] data;
+ private Dictionary parameters;
+ private CIShellContext context;
+
+ public StreamerAlgorithm(Data[] data, Dictionary parameters, CIShellContext context) {
+ this.data = data;
+ this.parameters = parameters;
+ this.context = context;
+ }
+
+ public Data[] execute() throws AlgorithmExecutionException {
+ Data[] output = new Data[]{new BasicData(new RandomNumberStream(), StreamImpl.class.getName())};
+
+ return output;
+ }
+}
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerFactory.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerFactory.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/StreamerFactory.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,14 @@
+package org.cishell.streaming.prototype;
+
+import java.util.Dictionary;
+
+import org.cishell.framework.CIShellContext;
+import org.cishell.framework.algorithm.Algorithm;
+import org.cishell.framework.algorithm.AlgorithmFactory;
+import org.cishell.framework.data.Data;
+
+public class StreamerFactory implements AlgorithmFactory {
+ public Algorithm createAlgorithm(Data[] data, Dictionary parameters, CIShellContext context) {
+ return new StreamerAlgorithm(data, parameters, context);
+ }
+}
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ConsumerAlgorithm.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ConsumerAlgorithm.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/ConsumerAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,47 @@
+package org.cishell.streaming.prototype.streamcore;
+
+import java.util.Dictionary;
+
+import org.cishell.framework.CIShellContext;
+import org.cishell.framework.algorithm.Algorithm;
+import org.cishell.framework.algorithm.AlgorithmExecutionException;
+import org.cishell.framework.data.Data;
+
+public abstract class ConsumerAlgorithm<T> implements Algorithm {
+ private Stream<T> stream;
+
+
+ @SuppressWarnings("unchecked") // TODO
+ public ConsumerAlgorithm(Data[] data, Dictionary parameters,
+ CIShellContext context) {
+ this.stream = (Stream<T>) data[0].getData();
+ }
+
+
+ public Data[] execute() throws AlgorithmExecutionException {
+ int nextToPrint = 0;
+
+ while (!stream.isFinalEndpoint(nextToPrint)) {
+ int endpoint = stream.getCurrentEndpoint();
+
+ for (int ii = nextToPrint; ii < endpoint; ii++) {
+ // TODO Debug only
+ System.out.println("About to consume "
+ + stream.getValueAtTimestep(ii)
+ + " from timestep "
+ + ii);
+ consume(stream.getValueAtTimestep(ii));
+ }
+
+ nextToPrint = endpoint;
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {}
+ }
+
+ return null;
+ }
+
+ public abstract void consume(T value);
+}
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/DefaultStream.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/DefaultStream.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/DefaultStream.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,85 @@
+package org.cishell.streaming.prototype.streamcore;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class DefaultStream<T> implements Stream<T> {
+ private static final long DEFAULT_SLEEP_TIME = 1000;
+
+ private List<T> dataList =
+ Collections.synchronizedList(new ArrayList<T>());
+ private Thread streamingThread;
+
+ private boolean paused;
+
+ private boolean stopped;
+
+
+ public DefaultStream() {
+ this(DEFAULT_SLEEP_TIME);
+ }
+
+ public DefaultStream(final long sleepTime) {
+ this.streamingThread = new Thread(new Runnable() {
+ public void run() {
+ while(!isStopped() && !isFinished()) {
+ while (isPaused()) {
+ // TODO Debug only
+ System.out.println("Still paused!");
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {}
+ }
+
+ dataList.add(next());
+
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {}
+ }
+ }
+ });
+
+ streamingThread.start();
+ }
+
+ public void pause() {
+ setPaused(true);
+ }
+ public void unpause() {
+ setPaused(false);
+ }
+ public boolean isPaused() {
+ return paused;
+ }
+ public void setPaused(boolean paused) {
+ this.paused = paused;
+ }
+
+ public void stop() {
+ setStopped(true);
+ }
+ public boolean isStopped() {
+ return stopped;
+ }
+ public void setStopped(boolean stopped) {
+ this.stopped = stopped;
+ }
+
+ public abstract boolean isFinished();
+
+
+ public int getCurrentEndpoint() {
+ return dataList.size();
+ }
+
+ public T getValueAtTimestep(int timestep) {
+ return dataList.get(timestep);
+ }
+
+ public boolean isFinalEndpoint(int candidateEndpoint) {
+ return ((!streamingThread.isAlive())
+ && (candidateEndpoint == getCurrentEndpoint()));
+ }
+}
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/Stream.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/Stream.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/Stream.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,12 @@
+package org.cishell.streaming.prototype.streamcore;
+
+public interface Stream<T> {
+ public T next();
+ public void pause();
+ public void unpause();
+ public void stop();
+
+ public int getCurrentEndpoint();
+ public T getValueAtTimestep(int timestep);
+ public boolean isFinalEndpoint(int candidateEndpoint);
+}
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/StreamAlgorithm.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/StreamAlgorithm.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamcore/StreamAlgorithm.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,79 @@
+package org.cishell.streaming.prototype.streamcore;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+
+import org.cishell.app.service.datamanager.DataManagerListener;
+import org.cishell.app.service.datamanager.DataManagerService;
+import org.cishell.app.service.scheduler.SchedulerListener;
+import org.cishell.app.service.scheduler.SchedulerService;
+import org.cishell.framework.CIShellContext;
+import org.cishell.framework.algorithm.Algorithm;
+import org.cishell.framework.algorithm.AlgorithmExecutionException;
+import org.cishell.framework.data.BasicData;
+import org.cishell.framework.data.Data;
+
+public abstract class StreamAlgorithm<T>
+ implements Algorithm, DataManagerListener, SchedulerListener {
+ private Data outStreamContainer;
+ private Stream<T> stream;
+ private DataManagerService dataManager;
+ private SchedulerService scheduler;
+
+ @SuppressWarnings("unchecked")
+ public StreamAlgorithm(
+ Data[] data, Dictionary parameters, CIShellContext context) {
+ this.dataManager =
+ (DataManagerService) context.getService(
+ DataManagerService.class.getName());
+ dataManager.addDataManagerListener(this);
+
+ this.scheduler =
+ (SchedulerService) context.getService(
+ SchedulerService.class.getName());
+ scheduler.addSchedulerListener(this);
+ }
+
+
+ public Data[] execute() throws AlgorithmExecutionException {
+ this.stream = createStream();
+ this.outStreamContainer =
+ new BasicData(stream, Stream.class.getName());
+ return new Data[]{ outStreamContainer };
+ }
+
+ protected abstract Stream<T> createStream();
+
+
+ // *** DataManagerListener (TODO: Implement as appropriate)
+ public void dataAdded(Data data, String label) {}
+ public void dataLabelChanged(Data data, String label) {}
+ public void dataSelected(Data[] data) {}
+ public void dataRemoved(Data data) {
+ if (outStreamContainer.equals(data)) {
+ stream.stop();
+ dataManager.removeDataManagerListener(this);
+ scheduler.removeSchedulerListener(this);
+ }
+ }
+ // *** DataManagerListener (TODO: Implement as appropriate)
+
+
+ // *** SchedulerListener (TODO: Implement as appropriate)
+ public void algorithmScheduled(Algorithm algorithm, Calendar time) {}
+ public void algorithmRescheduled(Algorithm algorithm, Calendar time) {}
+ public void algorithmUnscheduled(Algorithm algorithm) {}
+ public void algorithmStarted(Algorithm algorithm) {}
+ public void algorithmFinished(Algorithm algorithm, Data[] createdData) {}
+ public void algorithmError(Algorithm algorithm, Throwable error) {}
+ public void schedulerRunStateChanged(boolean isRunning) {
+ System.out.println("Scheduler run state changed!");
+ if (isRunning) {
+ stream.unpause();
+ } else {
+ stream.pause();
+ }
+ }
+ public void schedulerCleared() {}
+ // *** SchedulerListener (TODO: Implement as appropriate)
+}
\ No newline at end of file
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/Stream.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/Stream.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/Stream.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,11 @@
+package org.cishell.streaming.prototype.streamlib;
+
+public interface Stream<T> {
+
+ public abstract int getCurrentEndpoint();
+
+ public abstract T getValueAtTimestep(int timestep);
+
+ public abstract boolean isFinalEndpoint(int candidateEndpoint);
+
+}
Added: trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/StreamImpl.java
===================================================================
--- trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/StreamImpl.java (rev 0)
+++ trunk/testing/org.cishell.streaming.prototype/src/org/cishell/streaming/prototype/streamlib/StreamImpl.java 2009-08-28 19:19:55 UTC (rev 941)
@@ -0,0 +1,57 @@
+package org.cishell.streaming.prototype.streamlib;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class StreamImpl<T> implements Stream<T> {
+
+ private static final long DEFAULT_SLEEP_TIME = 1000;
+
+ private List<T> streamingList = Collections.synchronizedList(new ArrayList<T>());
+ private Thread streamingThread;
+
+ public StreamImpl() {
+ this(DEFAULT_SLEEP_TIME);
+ }
+
+ public StreamImpl(final long sleepTime) {
+ this.streamingThread = new Thread(new Runnable(){
+ public void run() {
+ while(! StreamImpl.this.isFinished()) {
+ StreamImpl.this.streamingList.add(StreamImpl.this.yield());
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {}
+ }
+ }
+ });
+
+ streamingThread.start();
+ }
+
+
+ public abstract boolean isFinished();
+ public abstract T yield();
+
+ /* (non-Javadoc)
+ * @see org.cishell.streaming.prototype.streamlib.Streamo#getCurrentEndpoint()
+ */
+ public int getCurrentEndpoint() {
+ return streamingList.size();
+ }
+
+ /* (non-Javadoc)
+ * @see org.cishell.streaming.prototype.streamlib.Streamo#getValueAtTimestep(int)
+ */
+ public T getValueAtTimestep(int timestep) {
+ return streamingList.get(timestep);
+ }
+
+ /* (non-Javadoc)
+ * @see org.cishell.streaming.prototype.streamlib.Streamo#isFinalEndpoint(int)
+ */
+ public boolean isFinalEndpoint(int candidateEndpoint) {
+ return !streamingThread.isAlive() && candidateEndpoint == getCurrentEndpoint();
+ }
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|