From: Ron S. <ron...@ya...> - 2005-08-17 06:10:09
|
User: rsigal Date: 05/08/17 02:10:04 Modified: src/main/org/jboss/remoting/transport/multiplex MultiplexingOutputStream.java Protocol.java MultiplexingInputStream.java MultiPortVirtualServerSocket.java VirtualSocket.java MultiplexingManager.java Log: Added support for communicating shutdown of remote input and output streams. Revision Changes Path 1.7 +8 -4 JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/MultiplexingOutputStream.java (In the diff below, changes in quantity of whitespace are not shown.) Index: MultiplexingOutputStream.java =================================================================== RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/MultiplexingOutputStream.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -b -r1.6 -r1.7 --- MultiplexingOutputStream.java 16 Aug 2005 06:57:23 -0000 1.6 +++ MultiplexingOutputStream.java 17 Aug 2005 06:10:04 -0000 1.7 @@ -12,6 +12,8 @@ import java.io.OutputStream; import java.net.SocketException; +import org.jboss.logging.Logger; + /** * <p> * Copyright (c) 2005 @@ -20,6 +22,7 @@ */ public class MultiplexingOutputStream extends OutputStream { + protected static final Logger log = Logger.getLogger(MultiplexingOutputStream.class); private MultiplexingManager manager; private OutputMultiplexor outputMultiplexor; private VirtualSocket virtualSocket; @@ -113,11 +116,11 @@ */ protected void checkStatus() throws IOException { + if (closed) + throw new SocketException("Socket closed"); + if (shutdown) throw new SocketException("Broken pipe"); - - if (closed) - throw new IOException("socket is closed"); } @@ -126,8 +129,9 @@ * * */ - protected void handleRemoteOutputShutdown() + protected void handleRemoteInputShutdown() { + log.debug("entering handleRemoteInputShutdown()"); shutdown = true; } 1.8 +1 -1 JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/Protocol.java (In the diff below, changes in quantity of whitespace are not shown.) Index: Protocol.java =================================================================== RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/Protocol.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -b -r1.7 -r1.8 --- Protocol.java 16 Aug 2005 06:57:23 -0000 1.7 +++ Protocol.java 17 Aug 2005 06:10:04 -0000 1.8 @@ -363,7 +363,7 @@ case MP_OUTPUT_SHUTDOWN: port = is.readInt(); - log.info("back channel thread: read INPUT_SHUTDOWN for port: " + port); + log.info("back channel thread: read OUTPUT_SHUTDOWN for port: " + port); socket = manager.getSocketByLocalPort(new SocketId(port)); if (socket == null) 1.4 +180 -9 JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/MultiplexingInputStream.java (In the diff below, changes in quantity of whitespace are not shown.) Index: MultiplexingInputStream.java =================================================================== RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/MultiplexingInputStream.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -b -r1.3 -r1.4 --- MultiplexingInputStream.java 16 Aug 2005 06:57:23 -0000 1.3 +++ MultiplexingInputStream.java 17 Aug 2005 06:10:04 -0000 1.4 @@ -10,10 +10,13 @@ import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.net.SocketException; +import org.jboss.logging.Logger; + /** * * <p> @@ -23,18 +26,22 @@ */ public class MultiplexingInputStream extends PipedInputStream { + protected static final Logger log = Logger.getLogger(MultiplexingInputStream.class); private VirtualSocket socket; + private PipedOutputStream sourceStream; private boolean eof = false; private boolean closed = false; private boolean remoteShutDownPending = false; + private Thread readThread; /** * */ - public MultiplexingInputStream(VirtualSocket socket) + public MultiplexingInputStream(PipedOutputStream sourceStream, VirtualSocket socket) throws IOException { - super(); + super(sourceStream); + this.sourceStream = sourceStream; this.socket = socket; } @@ -43,9 +50,10 @@ * @param src * @throws java.io.IOException */ - public MultiplexingInputStream(PipedOutputStream src) throws IOException + public MultiplexingInputStream(PipedOutputStream sourceStream) throws IOException { - super(src); + super(sourceStream); + this.sourceStream = sourceStream; } @@ -76,14 +84,40 @@ try { - return super.read(); + synchronized(this) + { + readThread = Thread.currentThread(); + log.debug("read(): calling super.read()"); + int b = super.read(); + readThread = null; + log.debug("read(): returning from super.read()"); + return b; + } + } + catch (InterruptedIOException e) + { + if (remoteShutDownPending) + { + remoteShutDownPending = false; + setEOF(); + return -1; + } + + throw e; } catch (IOException e) { if (closed) + { throw new SocketException("Socket closed"); + } else if (remoteShutDownPending) + { // if notified of remote output shutdown during super.read() + log.debug("read(): interrupted due to remote shutdown pending"); + remoteShutDownPending = false; + setEOF(); return -1; + } else throw e; } @@ -120,12 +154,42 @@ try { - return super.read(bytes, off, len); + // We leave a reference to the current thread so that handleRemoteShutdown() can + // interrupt it if necessary. + synchronized(this) + { + readThread = Thread.currentThread(); + log.debug("read(): calling super.read()"); + int n = super.read(bytes, off, len); + readThread = null; + log.debug("read(): returning from super.read()"); + return n; + } + } + catch (InterruptedIOException e) + { + if (remoteShutDownPending) + { + remoteShutDownPending = false; + setEOF(); + return -1; + } + + throw e; } catch (IOException e) { if (closed) + { throw new SocketException("Socket closed"); + } + else if (remoteShutDownPending) + { // if notified of remote output shutdown during super.read() + log.debug("read(): interrupted due to remote shutdown pending"); + remoteShutDownPending = false; + setEOF(); + return -1; + } else throw e; } @@ -155,13 +219,26 @@ */ protected void handleRemoteShutdown() throws IOException { + log.debug("entering handleRemoteShutdown()"); + if (eof) return; remoteShutDownPending = true; + if (available() == 0) + { + // Hack Alert. If a thread is blocked in read(), we need to interrupt it. + // This works because when PipedInputStream has no bytes to read, it loops in a wait(). + synchronized(this) + { + if (readThread != null) + readThread.interrupt(); + } + } + //temporary - super.close(); + //super.close(); } @@ -171,4 +248,98 @@ { eof = true; } + + + class IOExceptionWrapper + { + private IOException e; + IOException get() {return e;} + void set(IOException e) {this.e = e;} + boolean isSet() {return e != null;} + } + + class IntWrapper + { + private int i; + private boolean set = false; + int get() {return i;} + void set(int i) {this.i = i; set = true;} + boolean isSet() {return set;} + } + +/** + * FIXME Comment this + * + * @return + */ + protected int timedRead() throws IOException + { + final IntWrapper b = new IntWrapper(); + final IOExceptionWrapper readException = new IOExceptionWrapper(); + final Thread thisThread = Thread.currentThread(); + final Object lock = new Object(); + + final Thread readThread = new Thread() + { + public void run() + { + try + { + synchronized(lock) + { + log.info("readThread: entered sync section"); + b.set(MultiplexingInputStream.super.read()); + lock.notifyAll(); + } + } + catch (IOException e) + { + readException.set(e); + } + } + }; + + Thread waitThread = new Thread() + { + public void run() + { + synchronized(lock) + { + readThread.start(); + lock.notifyAll(); + try + { + lock.wait(); + } + catch (InterruptedException e) + { + log.info(e); + } + + thisThread.interrupt(); + } + } + }; + + waitThread.start(); + + while(!b.isSet() && !readException.isSet() && !remoteShutDownPending && !eof) + { + try + { + Thread.sleep(2000); + log.debug("timedRead(): waiting 1000"); + } + catch (InterruptedException e) + { + log.info("timeRead(): interrupted"); + } + } + + if (readException.isSet()) + throw readException.get(); + + log.info("timedRead(): returning: " + b.get()); + return b.get(); + } } 1.5 +1 -0 JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/MultiPortVirtualServerSocket.java (In the diff below, changes in quantity of whitespace are not shown.) Index: MultiPortVirtualServerSocket.java =================================================================== RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/MultiPortVirtualServerSocket.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -b -r1.4 -r1.5 --- MultiPortVirtualServerSocket.java 6 Aug 2005 06:32:06 -0000 1.4 +++ MultiPortVirtualServerSocket.java 17 Aug 2005 06:10:04 -0000 1.5 @@ -91,6 +91,7 @@ */ public void close() throws IOException { + log.info("MultiPortVirtualServerSocket: closing"); super.close(); } 1.8 +8 -6 JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/VirtualSocket.java (In the diff below, changes in quantity of whitespace are not shown.) Index: VirtualSocket.java =================================================================== RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/VirtualSocket.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -b -r1.7 -r1.8 --- VirtualSocket.java 16 Aug 2005 06:57:23 -0000 1.7 +++ VirtualSocket.java 17 Aug 2005 06:10:04 -0000 1.8 @@ -385,7 +385,7 @@ public synchronized InputStream getInputStream() throws IOException { if (isClosed()) - throw new SocketException("Socket Closed"); + throw new SocketException("Socket is closed"); if (isInputShutdown()) throw new SocketException("Socket input is shutdown"); @@ -417,10 +417,10 @@ public synchronized OutputStream getOutputStream() throws IOException { if (isClosed()) - throw new IOException("Socket Closed"); + throw new SocketException("Socket is closed"); if (isOutputShutdown()) - throw new IOException("Socket output is shutdown"); + throw new SocketException("Socket output is shutdown"); // TODO: return distinct output streams? See PlainSocketImpl. //return new SocketOutputStream(this); @@ -501,6 +501,7 @@ inputStream.setEOF(); inputShutdown = true; + protocol.notifyInputShutdown(localSocketId); } @@ -520,6 +521,7 @@ outputStream.shutdown(); outputShutdown = true; + protocol.notifyOutputShutdown(localSocketId); } @@ -566,7 +568,7 @@ */ public synchronized void handleRemoteInputShutDown() throws IOException { - inputStream.handleRemoteShutdown(); + outputStream.handleRemoteInputShutdown(); } @@ -577,7 +579,7 @@ */ public synchronized void handleRemoteOutputShutDown() throws IOException { - outputStream.handleRemoteOutputShutdown(); + inputStream.handleRemoteShutdown(); } @@ -592,7 +594,7 @@ log.info("virtual socket disconnecting: local port: " + getLocalVirtualPort()); outputStream.flush(); - outputStream.handleRemoteOutputShutdown(); + outputStream.handleRemoteInputShutdown(); receivedDisconnectMessage = true; // TODO 1.5 +2 -2 JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/MultiplexingManager.java (In the diff below, changes in quantity of whitespace are not shown.) Index: MultiplexingManager.java =================================================================== RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/multiplex/MultiplexingManager.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -b -r1.4 -r1.5 --- MultiplexingManager.java 16 Aug 2005 06:57:23 -0000 1.4 +++ MultiplexingManager.java 17 Aug 2005 06:10:04 -0000 1.5 @@ -495,8 +495,8 @@ */ public MultiplexingInputStream getAnInputStream(SocketId socketId, VirtualSocket socket) throws IOException { - MultiplexingInputStream mis = new MultiplexingInputStream(socket); - PipedOutputStream pos = new PipedOutputStream(mis); + PipedOutputStream pos = new PipedOutputStream(); + MultiplexingInputStream mis = new MultiplexingInputStream(pos, socket); outputStreamMap.put(socketId, pos); return mis; } |