|
From: <cr...@us...> - 2009-03-10 14:34:47
|
Revision: 5095
http://jnode.svn.sourceforge.net/jnode/?rev=5095&view=rev
Author: crawley
Date: 2009-03-10 14:34:35 +0000 (Tue, 10 Mar 2009)
Log Message:
-----------
New classes to be used in a reworked pipeline implementation for Bjorne.
(Unfortunately, the classic PipedInput/Output won't work when there are
potentially multiple threads reading or writing the same pipe.)
Added Paths:
-----------
trunk/shell/src/shell/org/jnode/shell/io/Pipeline.java
trunk/shell/src/shell/org/jnode/shell/io/PipelineInputStream.java
trunk/shell/src/shell/org/jnode/shell/io/PipelineOutputStream.java
trunk/shell/src/test/org/jnode/test/shell/io/PipelineTest.java
Added: trunk/shell/src/shell/org/jnode/shell/io/Pipeline.java
===================================================================
--- trunk/shell/src/shell/org/jnode/shell/io/Pipeline.java (rev 0)
+++ trunk/shell/src/shell/org/jnode/shell/io/Pipeline.java 2009-03-10 14:34:35 UTC (rev 5095)
@@ -0,0 +1,220 @@
+package org.jnode.shell.io;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class provides a buffered byte-stream pipeline implementation that
+ * supports multiple sources and sinks. The pipeline has a finite buffer,
+ * so a thread that reads and writes to the same pipeline risks deadlock.
+ * <p>
+ * Unlike the standard Piped* classes,
+ * Pipeline and its related classes do not try to detect dead pipes based
+ * on exit of threads. Instead, a pipeline shuts down when the sources
+ * are closed, or when {@link #shutdown()} method called.
+ * <p>
+ * The intended lifecycle of a Pipeline is as follows:
+ * <ol>
+ * <li>A Pipeline object is instantiated.</li>
+ * <li>One or more sources and sinks are created using {@link #createSource()}
+ * and {@link #createSink()}.
+ * <li>The Pipeline is activated by calling {@link #activate()}.
+ * <li>Data is written to the pipeline sources and read from the sinks.
+ * <li>The pipeline sources are closed, causing the pipeline to shut down cleanly.
+ * Read calls on the sinks will return any remaining buffered data and then
+ * signal EOF in the normal way.
+ * <li>The sinks are closed, and the Pipeline is shutdown.
+ * </ol>
+ *
+ * @author cr...@jn...
+ */
+public class Pipeline {
+ // FIXME This first-cut implementation unnecessarily double-copies data when
+ // a reader is waiting for it. Also, it doesn't fill/empty the buffer in a
+ // circular fashion. Finally, it doesn't implement atomic writes / reads or
+ // detect situations where behavior is non-deterministic.
+
+ private List<PipelineInputStream> sinks =
+ new ArrayList<PipelineInputStream>();
+ private List<PipelineOutputStream> sources =
+ new ArrayList<PipelineOutputStream>();
+
+ private byte[] buffer = new byte[1024];
+ private int pos = 0;
+ private int lim = 0;
+ private boolean activated;
+
+ /**
+ * Create a pipeline, in 'inactive' state.
+ */
+ public Pipeline() {
+ }
+
+ /**
+ * Create a sink for a inactive pipeline.
+ * @return the sink.
+ * @throws IOException This is thrown if the pipeline is 'active' or 'shut down'.
+ */
+ public synchronized PipelineInputStream createSink() throws IOException {
+ if (activated) {
+ throw new IOException("pipeline state wrong");
+ }
+ PipelineInputStream is = new PipelineInputStream(this);
+ sinks.add(is);
+ return is;
+ }
+
+ /**
+ * Create a source for a inactive pipeline.
+ * @return the source.
+ * @throws IOException This is thrown if the pipeline is 'active' or 'shut down'.
+ */
+ public synchronized PipelineOutputStream createSource() throws IOException {
+ if (activated) {
+ throw new IOException("pipeline state wrong");
+ }
+ PipelineOutputStream os = new PipelineOutputStream(this);
+ sources.add(os);
+ return os;
+ }
+
+ /**
+ * Put the pipeline into the 'active' state.
+ * @throws IOException This is thrown if the pipeline is 'shut down', or
+ * if it is 'inactive' but there are no sources or sinks.
+ */
+ public synchronized void activate() throws IOException {
+ if (buffer == null) {
+ throw new IOException("pipeline state wrong");
+ }
+ if (sinks.isEmpty() || sources.isEmpty()) {
+ throw new IOException("pipeline has no inputs and/or outputs");
+ }
+ activated = true;
+ }
+
+ /**
+ * Test if the pipeline is in the 'active' state.
+ * @return
+ */
+ public synchronized boolean isActive() {
+ return activated && buffer != null;
+ }
+
+ /**
+ * Test if the pipeline is in the 'shut down' state.
+ * @return
+ */
+ public synchronized boolean isShutdown() {
+ return activated && buffer == null;
+ }
+
+ /**
+ * Forcibly shut down the pipeline. This will cause any threads
+ * currently blocked on sources or sinks to get an IOException.
+ */
+ public synchronized void shutdown() {
+ buffer = null;
+ this.notifyAll();
+ }
+
+ synchronized int available() {
+ return lim - pos;
+ }
+
+ synchronized void closeInput(PipelineInputStream input) {
+ sinks.remove(input);
+ if (sinks.isEmpty()) {
+ buffer = null;
+ this.notifyAll();
+ }
+ }
+
+ synchronized void closeOutput(PipelineOutputStream output) {
+ sources.remove(output);
+ if (sources.isEmpty()) {
+ buffer = null;
+ this.notifyAll();
+ }
+ }
+
+ synchronized int read(byte[] b, int off, int len) throws IOException {
+ if (buffer == null) {
+ throw new IOException("pipeline state wrong");
+ }
+ int startOff = off;
+ while (off < len && !sources.isEmpty()) {
+ while (pos == lim) {
+ try {
+ this.wait();
+ } catch (InterruptedException ex) {
+ throw new InterruptedIOException();
+ }
+ }
+ while (off < len && pos != lim) {
+ b[off++] = buffer[pos++];
+ }
+ if (pos == lim) {
+ pos = 0;
+ lim = 0;
+ }
+ this.notify();
+ }
+ return (off == startOff) ? -1 : (off - startOff);
+ }
+
+ synchronized long skip(long n) throws IOException {
+ if (buffer == null) {
+ throw new IOException("pipeline state wrong");
+ }
+ long off = 0;
+ while (off < n && !sources.isEmpty()) {
+ while (pos == lim) {
+ try {
+ this.wait();
+ } catch (InterruptedException ex) {
+ throw new InterruptedIOException();
+ }
+ }
+ long count = Math.min(lim - pos, n - off);
+ pos += count;
+ if (pos == lim) {
+ pos = 0;
+ lim = 0;
+ }
+ this.notify();
+ }
+ return off == 0 ? -1 : off;
+ }
+
+ synchronized void flush() throws IOException {
+ if (buffer == null) {
+ throw new IOException("pipeline state wrong");
+ }
+ this.notifyAll();
+ }
+
+ synchronized void write(byte[] b, int off, int len) throws IOException {
+ if (buffer == null) {
+ throw new IOException("pipeline state wrong");
+ }
+ while (off < len) {
+ while (lim == buffer.length) {
+ if (sinks.isEmpty()) {
+ throw new IOException("pipeline broken");
+ }
+ try {
+ this.wait();
+ } catch (InterruptedException ex) {
+ throw new InterruptedIOException();
+ }
+ }
+ while (off < len && lim < buffer.length) {
+ buffer[lim++] = b[off++];
+ }
+ this.notify();
+ }
+ }
+}
Added: trunk/shell/src/shell/org/jnode/shell/io/PipelineInputStream.java
===================================================================
--- trunk/shell/src/shell/org/jnode/shell/io/PipelineInputStream.java (rev 0)
+++ trunk/shell/src/shell/org/jnode/shell/io/PipelineInputStream.java 2009-03-10 14:34:35 UTC (rev 5095)
@@ -0,0 +1,95 @@
+package org.jnode.shell.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This class provides the input side of a JNode shell pipeline.
+ * Unlike the standard PipedInputStream, this one is designed to
+ * support a pipeline with multiple readers and writers.
+ *
+ * @author cr...@jn...
+ */
+public final class PipelineInputStream extends InputStream {
+
+ private Pipeline pipeline;
+
+ /**
+ * This is not a public constructor. Use {@link Pipeline#createSink()}
+ * to create a PipelineInputStream instance.
+ *
+ * @param pipeline the parent pipeline.
+ */
+ PipelineInputStream(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ byte[] buffer = new byte[1];
+ int got = pipeline.read(buffer, 0, 1);
+ if (got == 1) {
+ return buffer[0];
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ return pipeline.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (pipeline != null) {
+ pipeline.closeInput(this);
+ pipeline = null;
+ }
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ // ignore
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ return pipeline.read(b, off, len);
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ return pipeline.read(b, 0, b.length);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new IOException("reset not supported");
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ return pipeline.skip(n);
+ }
+}
Added: trunk/shell/src/shell/org/jnode/shell/io/PipelineOutputStream.java
===================================================================
--- trunk/shell/src/shell/org/jnode/shell/io/PipelineOutputStream.java (rev 0)
+++ trunk/shell/src/shell/org/jnode/shell/io/PipelineOutputStream.java 2009-03-10 14:34:35 UTC (rev 5095)
@@ -0,0 +1,68 @@
+package org.jnode.shell.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * This class provides the output side of a JNode shell pipeline.
+ * Unlike the standard PipedInputStream, this one is designed to
+ * support a pipeline with multiple readers and writers.
+ *
+ * @author cr...@jn...
+ */
+public final class PipelineOutputStream extends OutputStream {
+
+ private Pipeline pipeline;
+
+ /**
+ * This is not a public constructor. Use {@link Pipeline#createSource()}
+ * to create a PipelineOutputStream instance.
+ *
+ * @param pipeline the parent pipeline.
+ */
+ PipelineOutputStream(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (pipeline != null) {
+ pipeline.closeOutput(this);
+ pipeline = null;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ pipeline.flush();
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ pipeline.write(b, off, len);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ pipeline.write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (pipeline == null) {
+ throw new IOException("pipeline closed");
+ }
+ byte[] buffer = new byte[]{(byte) b};
+ pipeline.write(buffer, 0, 1);
+ }
+
+}
Added: trunk/shell/src/test/org/jnode/test/shell/io/PipelineTest.java
===================================================================
--- trunk/shell/src/test/org/jnode/test/shell/io/PipelineTest.java (rev 0)
+++ trunk/shell/src/test/org/jnode/test/shell/io/PipelineTest.java 2009-03-10 14:34:35 UTC (rev 5095)
@@ -0,0 +1,100 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2003-2009 JNode.org
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; If not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+package org.jnode.test.shell.io;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import junit.framework.TestCase;
+
+import org.jnode.shell.io.Pipeline;
+
+public class PipelineTest extends TestCase {
+
+ public void testConstructor() {
+ new Pipeline();
+ }
+
+ public void testLifecycle() throws IOException {
+ Pipeline p = new Pipeline();
+ InputStream is = p.createSink();
+ OutputStream os = p.createSource();
+ assertFalse(p.isActive());
+ assertFalse(p.isShutdown());
+ p.activate();
+ assertTrue(p.isActive());
+ assertFalse(p.isShutdown());
+ is.close();
+ os.close();
+ assertFalse(p.isActive());
+ assertTrue(p.isShutdown());
+ }
+
+ public void testLifecycle2() throws IOException {
+ Pipeline p = new Pipeline();
+ InputStream is = p.createSink();
+ OutputStream os = p.createSource();
+ assertFalse(p.isActive());
+ assertFalse(p.isShutdown());
+ p.activate();
+ assertTrue(p.isActive());
+ assertFalse(p.isShutdown());
+ p.shutdown();
+ assertFalse(p.isActive());
+ assertTrue(p.isShutdown());
+ try {
+ is.read();
+ fail("no exception on read()");
+ } catch (IOException ex) {
+ // expected ...
+ }
+ try {
+ os.write('X');
+ fail("no exception on write()");
+ } catch (IOException ex) {
+ // expected ...
+ }
+ }
+
+ public void testOneOne() throws IOException {
+ // This should work ... despite the source and sink being operated from
+ // the same thread ... because we are reading/writing less than a buffer full.
+ Pipeline p = new Pipeline();
+ InputStream is = p.createSink();
+ OutputStream os = p.createSource();
+ p.activate();
+ assertEquals(0, is.available());
+ os.write('A');
+ assertEquals(1, is.available());
+ assertEquals('A', is.read());
+ os.write('B');
+ assertEquals('B', is.read());
+ assertEquals(0, is.available());
+ os.write("the quick brown fox".getBytes());
+ int len = "the quick brown fox".length();
+ assertEquals(len, is.available());
+ byte[] buffer = new byte[100];
+ assertEquals(len, is.read(buffer, 0, len));
+ assertEquals("the quick brown fox", new String(buffer, 0, len));
+ }
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|