|
From: <cr...@us...> - 2009-03-13 11:25:06
|
Revision: 5096
http://jnode.svn.sourceforge.net/jnode/?rev=5096&view=rev
Author: crawley
Date: 2009-03-13 11:24:46 +0000 (Fri, 13 Mar 2009)
Log Message:
-----------
Fixed bugs and expanded unit test for Pipeline et al.
Modified Paths:
--------------
trunk/shell/src/shell/org/jnode/shell/io/Pipeline.java
trunk/shell/src/test/org/jnode/test/shell/io/PipelineTest.java
Modified: trunk/shell/src/shell/org/jnode/shell/io/Pipeline.java
===================================================================
--- trunk/shell/src/shell/org/jnode/shell/io/Pipeline.java 2009-03-10 14:34:35 UTC (rev 5095)
+++ trunk/shell/src/shell/org/jnode/shell/io/Pipeline.java 2009-03-13 11:24:46 UTC (rev 5096)
@@ -32,49 +32,77 @@
*/
public class Pipeline {
// FIXME This first-cut implementation unnecessarily double-copies data when
- // a reader is waiting for it. Also, it doesn't fill/empty the buffer in a
- // circular fashion. Finally, it doesn't implement atomic writes / reads or
- // detect situations where behavior is non-deterministic.
+ // a reader is waiting for it. It doesn't fill/empty the buffer in a
+ // circular fashion. If there are multiple active readers or writers,
+ // too many threads get woken up. Finally, this class doesn't implement
+ // atomic writes / reads or detect cases where behavior is non-deterministic.
private List<PipelineInputStream> sinks =
new ArrayList<PipelineInputStream>();
private List<PipelineOutputStream> sources =
new ArrayList<PipelineOutputStream>();
- private byte[] buffer = new byte[1024];
+ private byte[] buffer;
private int pos = 0;
private int lim = 0;
- private boolean activated;
+ private int state = INITIAL;
+ private static final int INITIAL = 1;
+ private static final int ACTIVE = 2;
+ private static final int CLOSED = 4;
+ private static final int SHUTDOWN = 8;
+
+ private static final String[] STATE_NAMES = new String[] {
+ null, "INITIAL", "ACTIVE", null, "CLOSED",
+ null, null, null, "SHUTDOWN"
+ };
+
/**
- * Create a pipeline, in 'inactive' state.
+ * The default Pipeline buffer size.
*/
+ public static final int DEFAULT_BUFFER_SIZE = 1024;
+
+ /**
+ * Create a pipeline, in 'inactive' state with the default buffer size;
+ */
public Pipeline() {
+ buffer = new byte[DEFAULT_BUFFER_SIZE];
}
/**
+ * Create a pipeline, in 'inactive' state.
+ * @param bufferSize the pipeline's buffer size.
+ */
+ public Pipeline(int bufferSize) {
+ buffer = new byte[bufferSize];
+ }
+
+ /**
* Create a sink for a inactive pipeline.
* @return the sink.
* @throws IOException This is thrown if the pipeline is 'active' or 'shut down'.
*/
public synchronized PipelineInputStream createSink() throws IOException {
- if (activated) {
- throw new IOException("pipeline state wrong");
- }
+ checkState(INITIAL, "create");
PipelineInputStream is = new PipelineInputStream(this);
sinks.add(is);
return is;
}
+ private void checkState(int allowedStates, String action) throws IOException {
+ if ((state & allowedStates) == 0) {
+ String stateName = STATE_NAMES[state];
+ throw new IOException(action + " not allowed in state " + stateName);
+ }
+ }
+
/**
* Create a source for a inactive pipeline.
* @return the source.
* @throws IOException This is thrown if the pipeline is 'active' or 'shut down'.
*/
public synchronized PipelineOutputStream createSource() throws IOException {
- if (activated) {
- throw new IOException("pipeline state wrong");
- }
+ checkState(INITIAL, "create");
PipelineOutputStream os = new PipelineOutputStream(this);
sources.add(os);
return os;
@@ -86,13 +114,11 @@
* if it is 'inactive' but there are no sources or sinks.
*/
public synchronized void activate() throws IOException {
- if (buffer == null) {
- throw new IOException("pipeline state wrong");
- }
+ checkState(INITIAL, "activate");
if (sinks.isEmpty() || sources.isEmpty()) {
throw new IOException("pipeline has no inputs and/or outputs");
}
- activated = true;
+ state = ACTIVE;
}
/**
@@ -100,15 +126,23 @@
* @return
*/
public synchronized boolean isActive() {
- return activated && buffer != null;
+ return state == ACTIVE;
}
/**
+ * Test if the pipeline is in the 'closed' state.
+ * @return
+ */
+ public synchronized boolean isClosed() {
+ return state == CLOSED;
+ }
+
+ /**
* Test if the pipeline is in the 'shut down' state.
* @return
*/
public synchronized boolean isShutdown() {
- return activated && buffer == null;
+ return state == SHUTDOWN;
}
/**
@@ -116,97 +150,98 @@
* currently blocked on sources or sinks to get an IOException.
*/
public synchronized void shutdown() {
- buffer = null;
+ state = SHUTDOWN;
this.notifyAll();
}
- synchronized int available() {
+ synchronized int available() throws IOException {
+ checkState(ACTIVE, "available");
return lim - pos;
}
synchronized void closeInput(PipelineInputStream input) {
sinks.remove(input);
if (sinks.isEmpty()) {
- buffer = null;
- this.notifyAll();
+ if (state < CLOSED) {
+ state = CLOSED;
+ this.notifyAll();
+ }
}
}
synchronized void closeOutput(PipelineOutputStream output) {
sources.remove(output);
if (sources.isEmpty()) {
- buffer = null;
- this.notifyAll();
+ if (state < CLOSED) {
+ state = CLOSED;
+ this.notifyAll();
+ }
}
}
synchronized int read(byte[] b, int off, int len) throws IOException {
- if (buffer == null) {
- throw new IOException("pipeline state wrong");
- }
+ checkState(ACTIVE | CLOSED | SHUTDOWN, "read");
int startOff = off;
- while (off < len && !sources.isEmpty()) {
- while (pos == lim) {
+ while (off < len && state <= CLOSED) {
+ while (pos == lim && state == ACTIVE) {
try {
this.wait();
} catch (InterruptedException ex) {
throw new InterruptedIOException();
}
}
- while (off < len && pos != lim) {
+ if (pos == lim) {
+ break;
+ }
+ while (off < len && pos < lim) {
b[off++] = buffer[pos++];
}
if (pos == lim) {
pos = 0;
lim = 0;
}
- this.notify();
+ this.notifyAll();
}
return (off == startOff) ? -1 : (off - startOff);
}
synchronized long skip(long n) throws IOException {
- if (buffer == null) {
- throw new IOException("pipeline state wrong");
- }
+ checkState(ACTIVE | CLOSED | SHUTDOWN, "skip");
long off = 0;
- while (off < n && !sources.isEmpty()) {
- while (pos == lim) {
+ while (off < n && state <= CLOSED) {
+ while (pos == lim && state == ACTIVE) {
try {
this.wait();
} catch (InterruptedException ex) {
throw new InterruptedIOException();
}
}
+ if (pos == lim) {
+ break;
+ }
long count = Math.min(lim - pos, n - off);
pos += count;
if (pos == lim) {
pos = 0;
lim = 0;
}
- this.notify();
+ this.notifyAll();
}
return off == 0 ? -1 : off;
}
synchronized void flush() throws IOException {
- if (buffer == null) {
- throw new IOException("pipeline state wrong");
- }
+ checkState(ACTIVE | CLOSED, "flush");
this.notifyAll();
}
synchronized void write(byte[] b, int off, int len) throws IOException {
- if (buffer == null) {
- throw new IOException("pipeline state wrong");
- }
+ checkState(ACTIVE, "write");
while (off < len) {
while (lim == buffer.length) {
- if (sinks.isEmpty()) {
- throw new IOException("pipeline broken");
- }
try {
this.wait();
+ checkState(ACTIVE, "write");
} catch (InterruptedException ex) {
throw new InterruptedIOException();
}
@@ -214,7 +249,7 @@
while (off < len && lim < buffer.length) {
buffer[lim++] = b[off++];
}
- this.notify();
+ this.notifyAll();
}
}
}
Modified: trunk/shell/src/test/org/jnode/test/shell/io/PipelineTest.java
===================================================================
--- trunk/shell/src/test/org/jnode/test/shell/io/PipelineTest.java 2009-03-10 14:34:35 UTC (rev 5095)
+++ trunk/shell/src/test/org/jnode/test/shell/io/PipelineTest.java 2009-03-13 11:24:46 UTC (rev 5096)
@@ -17,13 +17,17 @@
* along with this library; If not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-
package org.jnode.test.shell.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import junit.framework.TestCase;
@@ -40,14 +44,17 @@
InputStream is = p.createSink();
OutputStream os = p.createSource();
assertFalse(p.isActive());
+ assertFalse(p.isClosed());
assertFalse(p.isShutdown());
p.activate();
assertTrue(p.isActive());
+ assertFalse(p.isClosed());
assertFalse(p.isShutdown());
is.close();
os.close();
assertFalse(p.isActive());
- assertTrue(p.isShutdown());
+ assertTrue(p.isClosed());
+ assertFalse(p.isShutdown());
}
public void testLifecycle2() throws IOException {
@@ -62,13 +69,8 @@
p.shutdown();
assertFalse(p.isActive());
assertTrue(p.isShutdown());
+ assertEquals(-1, is.read());
try {
- is.read();
- fail("no exception on read()");
- } catch (IOException ex) {
- // expected ...
- }
- try {
os.write('X');
fail("no exception on write()");
} catch (IOException ex) {
@@ -97,4 +99,244 @@
assertEquals(len, is.read(buffer, 0, len));
assertEquals("the quick brown fox", new String(buffer, 0, len));
}
+
+ public void testTwo1_One1() throws Throwable {
+ Pipeline p = new Pipeline();
+ InputStream is = p.createSink();
+ OutputStream os = p.createSource();
+ OutputStream os2 = p.createSource();
+ p.activate();
+
+ Source source = new Source("1".getBytes(), 10000, -1, os);
+ Source source2 = new Source("2".getBytes(), 10000, -1, os2);
+ Sink sink = new Sink(20000, 1, -1, is);
+
+ List<Throwable> exceptions = runInThreads(new Runnable[] {source, source2, sink});
+ if (exceptions.size() > 0) {
+ throw exceptions.get(0);
+ }
+ assertEquals(10000, sink.getCount((byte) '1'));
+ assertEquals(10000, sink.getCount((byte) '2'));
+ }
+
+ public void testTwo10_One10() throws Throwable {
+ Pipeline p = new Pipeline();
+ InputStream is = p.createSink();
+ OutputStream os = p.createSource();
+ OutputStream os2 = p.createSource();
+ p.activate();
+
+ Source source = new Source("1111111111".getBytes(), 1000, -1, os);
+ Source source2 = new Source("2222222222".getBytes(), 1000, -1, os2);
+ Sink sink = new Sink(20000, 10, -1, is);
+
+ List<Throwable> exceptions = runInThreads(new Runnable[] {source, source2, sink});
+ if (exceptions.size() > 0) {
+ throw exceptions.get(0);
+ }
+ assertEquals(10000, sink.getCount((byte) '1'));
+ assertEquals(10000, sink.getCount((byte) '2'));
+ }
+
+ public void testTwo100_One100() throws Throwable {
+ Pipeline p = new Pipeline();
+ InputStream is = p.createSink();
+ OutputStream os = p.createSource();
+ OutputStream os2 = p.createSource();
+ p.activate();
+
+ byte[] buff1 = new byte[100];
+ Arrays.fill(buff1, (byte) '1');
+ Source source = new Source(buff1, 100, -1, os);
+ byte[] buff2 = new byte[100];
+ Arrays.fill(buff2, (byte) '2');
+ Source source2 = new Source(buff2, 100, -1, os2);
+ Sink sink = new Sink(20000, 100, -1, is);
+
+ List<Throwable> exceptions = runInThreads(new Runnable[] {source, source2, sink});
+ if (exceptions.size() > 0) {
+ throw exceptions.get(0);
+ }
+ assertEquals(10000, sink.getCount((byte) '1'));
+ assertEquals(10000, sink.getCount((byte) '2'));
+ }
+
+ public void testTwo100_One100_SmallBuffer() throws Throwable {
+ Pipeline p = new Pipeline(100);
+ InputStream is = p.createSink();
+ OutputStream os = p.createSource();
+ OutputStream os2 = p.createSource();
+ p.activate();
+
+ byte[] buff1 = new byte[100];
+ Arrays.fill(buff1, (byte) '1');
+ Source source = new Source(buff1, 100, -1, os);
+ byte[] buff2 = new byte[100];
+ Arrays.fill(buff2, (byte) '2');
+ Source source2 = new Source(buff2, 100, -1, os2);
+ Sink sink = new Sink(20000, 100, -1, is);
+
+ List<Throwable> exceptions = runInThreads(new Runnable[] {source, source2, sink});
+ if (exceptions.size() > 0) {
+ throw exceptions.get(0);
+ }
+ assertEquals(10000, sink.getCount((byte) '1'));
+ assertEquals(10000, sink.getCount((byte) '2'));
+ }
+
+ /**
+ * Create Threads for each runnable (with an exception handler), start them,
+ * join them and return any exceptions
+ * @param runnables the test runnables
+ * @return any exceptions caught.
+ */
+ private List<Throwable> runInThreads(Runnable[] runnables) {
+ final List<Throwable> exceptions = new ArrayList<Throwable>();
+ Thread[] threads = new Thread[runnables.length];
+ for (int i = 0; i < threads.length; i++) {
+ System.err.println("create");
+ threads[i] = new Thread(runnables[i]);
+ threads[i].setUncaughtExceptionHandler(
+ new UncaughtExceptionHandler() {
+ public void uncaughtException(Thread thr, Throwable exc) {
+ System.err.println("handled exception " + exc);
+ exc.printStackTrace();
+ exceptions.add(exc);
+ }
+ });
+ }
+ for (Thread thread : threads) {
+ System.err.println("start");
+ thread.start();
+ }
+ for (Thread thread : threads) {
+ try {
+ System.err.println("join");
+ thread.join();
+ } catch (InterruptedException ex) {
+ exceptions.add(ex);
+ }
+ }
+ System.err.println("done");
+ return exceptions;
+ }
+
+ private static class Source implements Runnable {
+ private byte[] b;
+ private int count;
+ private int sleep;
+ private OutputStream os;
+
+ /**
+ * Create a test source.
+ * @param b the buffer to be written
+ * @param count the number of times
+ * @param sleep sleep this number of milliseconds between writes; -1 means no sleep.
+ * @param os write bytes here.
+ */
+ Source(byte[] b, int count, int sleep, OutputStream os) {
+ this.b = b;
+ this.count = count;
+ this.os = os;
+ this.sleep = sleep;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < count; i++) {
+ os.write(b);
+ if (sleep != -1) {
+ Thread.sleep(sleep);
+ }
+ }
+ os.close();
+ os = null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ System.err.println("wrote " + count);
+ try {
+ if (os != null) {
+ os.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private static class Sink implements Runnable {
+ private HashMap<Byte, Integer> counters = new HashMap<Byte, Integer>();
+ private InputStream is;
+ private int max;
+ private int sleep;
+ private int len;
+
+ /**
+ * Create a test Sink.
+ * @param max if we read more than this number of bytes, throw an exception
+ * @param len the read buffer size.
+ * @param sleep sleep this number of milliseconds between reads; -1 means no sleep.
+ * @param is read bytes from here.
+ */
+ Sink(int max, int len, int sleep, InputStream is) {
+ this.is = is;
+ this.max = max;
+ this.sleep = sleep;
+ this.len = len;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int counter = 0;
+ byte[] buffer = new byte[len];
+ int nosRead = is.read(buffer);
+ while (nosRead != -1) {
+ for (int i = 0; i < nosRead; i++) {
+ Byte bb = new Byte((byte) buffer[i]);
+ Integer cc = counters.get(bb);
+ int c = (cc == null) ? 1 : (cc.intValue() + 1);
+ counters.put(bb, new Integer(c));
+ if (++counter > max) {
+ throw new RuntimeException("too many (" + counter + ")");
+ }
+ }
+ if (sleep != -1) {
+ Thread.sleep(sleep);
+ }
+ nosRead = is.read(buffer);
+ }
+ System.err.println("read " + counter);
+ is.close();
+ is = null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Get the number of bytes read for a given byte value.
+ * @param b the key byte
+ * @return the number instances of the 'key' byte read.
+ */
+ public int getCount(byte b) {
+ Integer cc = counters.get(b);
+ return (cc == null) ? 0 : cc.intValue();
+ }
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|