Thread: [Jukebox-cvs] CVS update: J4/src/java/gnu/j4/net/telnet Telnet.java
Brought to you by:
vtt
From: CVS B. <vt...@fr...> - 2000-03-22 15:48:25
|
User: vt Date: 00/03/22 08:43:10 Modified: src/java/gnu/j4/net/telnet Telnet.java Log: Checkpoint. Still doesn't work properly. Revision Changes Path 1.4 +290 -116 J4/src/java/gnu/j4/net/telnet/Telnet.java CVSWEB Options: ------------------- CVSWeb: Annotate this file: http://cvs.sourceforge.net/cgi-bin/cvsweb.cgi/J4/src/java/gnu/j4/net/telnet/Telnet.java?annotate=1.4&cvsroot=jukebox4 CVSWeb: View this file: http://cvs.sourceforge.net/cgi-bin/cvsweb.cgi/J4/src/java/gnu/j4/net/telnet/Telnet.java?rev=1.4&content-type=text/x-cvsweb-markup&cvsroot=jukebox4 CVSWeb: Diff to previous version: http://cvs.sourceforge.net/cgi-bin/cvsweb.cgi/J4/src/java/gnu/j4/net/telnet/Telnet.java.diff?r1=1.4&r2=1.3&cvsroot=jukebox4 ----------------------------------- Index: Telnet.java =================================================================== RCS file: /usr/local/cvs/J4/src/java/gnu/j4/net/telnet/Telnet.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- Telnet.java 2000/03/21 23:59:19 1.3 +++ Telnet.java 2000/03/22 15:43:10 1.4 @@ -2,17 +2,22 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; import java.util.Vector; import java.util.Hashtable; +import gnu.j4.sem.MutexSemaphore; +import gnu.j4.service.ActiveService; +import gnu.j4.service.ServiceUnavailableException; import gnu.j4.service.net.SocketService; -import gnu.j4.util.RingBuffer; /** * Implements just plain vanilla telnet <emph>protocol</emph> that does NOT @@ -32,7 +37,7 @@ * 854</a> for details. * * @author Copyright © <a href="mailto:vt...@fr...">Vadim Tkachenko</a> 1995, 2000 - * @version $Id: Telnet.java,v 1.3 2000/03/21 23:59:19 vt Exp $ + * @version $Id: Telnet.java,v 1.4 2000/03/22 15:43:10 vt Exp $ */ public class Telnet extends SocketService implements TelnetConstants, TelnetOptionConstants { @@ -69,13 +74,63 @@ protected BufferedOutputStream out; /** - * The internal buffer to store the data on the fly. + * User input stream buffer. * - * FIXME: to be replaced by the piped stream. + * The incoming data with telnet protocol escapes filtered out is + * written here and consumed by the {@link #inSource piped input + * stream}. */ - private RingBuffer inBuffer = new RingBuffer(); + protected PipedOutputStream inBuffer; /** + * User input stream. + * + * This stream is coupled with the {@link #inBuffer incoming data + * buffer}. + * + * <p> + * + * This stream is returned by the {@link #getInputStream + * getInputStream()} call. + */ + protected PipedInputStream inSource; + + /** + * User output steam. + * + * This stream is coupled with the {@link #outBuffer outcoming data + * buffer}. + */ + protected PipedInputStream outSource; + + /** + * User output stream buffer. + * + * The data written by the client is buffered for the {@link #outSource + * outgoing data buffer}. + * + * <p> + * + * This stream is returned by the {@link #getOutputStream + * getOutputStream()} call. + */ + protected PipedOutputStream outBuffer; + + /** + * The escape lock. + * + * The reader and writer shouldn't get in the escape handler's way. + */ + protected MutexSemaphore escapeLock = new MutexSemaphore(); + + /** + * The reader thread. + * + * Reads the data that the client stuffs into the {@link #outBuffer output stream}. + */ + protected Reader reader; + + /** * I've requested this and waiting for their WILL. */ protected Hashtable ackIWant = new Hashtable(); @@ -175,12 +230,24 @@ in = new BufferedInputStream(dataSocket.getInputStream()); out = new BufferedOutputStream(dataSocket.getOutputStream()); - - complain(LOG_DEBUG, CH_TELNET, in.getClass().getName()); - complain(LOG_DEBUG, CH_TELNET, out.getClass().getName()); - // bin = new BufferedInputStream(in); - // dataOut = new DataOutputStream(out); + inBuffer = new PipedOutputStream(); + inSource = new PipedInputStream(inBuffer); + + outBuffer = new PipedOutputStream(); + outSource = new PipedInputStream(outBuffer); + + reader = new Reader(outSource, out); + + if ( !reader.start().waitFor() ) { + + throw new ServiceUnavailableException("Couldn't start the reader"); + } + + addDependant(reader); + +// complain(LOG_DEBUG, CH_TELNET, in.getClass().getName()); +// complain(LOG_DEBUG, CH_TELNET, out.getClass().getName()); support(TOPT_SuppressGoAhead); support(TOPT_ECHO); @@ -190,13 +257,15 @@ support(new topt32("115200,57600,38400,9600,1200")); support(new topt33()); } - + /** * Don't kill the sockets, as the parent does, in case when this * service is a dependant. */ protected void shutdown() throws InterruptedException, Throwable { + reader.stop(); + // VT: FIXME: the architecture has changed, and dependsOn is no // more. Find a way to clone the behavior. @@ -220,6 +289,9 @@ } inBuffer = null; + inSource = null; + outBuffer = null; + outSource = null; ackIWant = null; ackTheyWant = null; localStatus = null; @@ -235,111 +307,76 @@ } /** - * How many bytes available to read. - * - * @return How many bytes available to read. + * Handle the protocol escape. * * @exception IOException when the I/O error occurs */ - public synchronized int available() { - return inBuffer.available(); - } - - /** - * Reads a byte of data. This method will block if no input is available. - * @return the byte read, or -1 if the end of the stream is reached. - * @exception IOException if an I/O error has occured. - */ -/** public synchronized int read() throws IOException { - - while ( available() == 0 ) { + protected final void handleEscape() throws InterruptedException, IOException { + + try { - complain(LOG_DEBUG, CH_TELNET, "read:wait"); - - try { + complain(LOG_DEBUG, CH_TELNET, "escape: waiting for the lock"); + escapeLock.waitFor(); + complain(LOG_DEBUG, CH_TELNET, "escape: got the lock"); - // wait(100); - wait(); + int actionCode = in.read(); - } catch ( InterruptedException e ) { + if ( actionCode == IAC ) { - complain(LOG_DEBUG, CH_TELNET, "read: " + e.toString()); + complain(LOG_DEBUG, CH_TELNET, "IAC escaped"); + inBuffer.write(IAC); + return; } + + complain(LOG_DEBUG, CH_TELNET, "remote: "+lookupEscape(actionCode)); + + int option = in.read(); - // VT: FIXME: what if it is already disabled when I read()? Do I - // wait forever? + complain(LOG_DEBUG, CH_TELNET, "remote: "+TelnetOption.getName(option)); - if ( !isEnabled() ) { - - return -1; + switch( actionCode ) { + + case WILL: + + remoteWill(option); + break; + + case DO: + + remoteDo(option); + break; + + case WONT: + + remoteWont(option); + break; + + case DONT: + + remoteDont(option); + break; + + case SB: + + // Start negotiations for the option + + TelnetOptionSB opt = localStatus.getSB(option); + opt.negotiate(in,out); + return; + + default: + + complain(LOG_DEBUG, CH_TELNET, "Got unknown action code " + + lookupEscape(actionCode) + + "("+TelnetOption.getName(option) + + ")"); + break; } - } - - complain(LOG_DEBUG, CH_TELNET, "read:ok"); - - int result = inBuffer.get(); - complain(LOG_DEBUG, CH_TELNET, "buf: " + result + "/" + (char)result); - return result; - } - */ - /** - * Handle protocol escape. - * @exception IOException when the I/O error occurs - */ - protected final void handleEscape() throws IOException { - - int actionCode = in.read(); - - if ( actionCode == IAC ) { - - complain(LOG_DEBUG, CH_TELNET, "IAC escaped"); - inBuffer.put(IAC); - return; - } - - complain(LOG_DEBUG, CH_TELNET, "remote: "+lookupEscape(actionCode)); - - int option = in.read(); - - complain(LOG_DEBUG, CH_TELNET, "remote: "+TelnetOption.getName(option)); - - switch( actionCode ) { - - case WILL: - - remoteWill(option); - break; - - case DO: - - remoteDo(option); - break; - - case WONT: - - remoteWont(option); - break; - - case DONT: - - remoteDont(option); - break; - - case SB: - - // Start negotiations for the option - - TelnetOptionSB opt = localStatus.getSB(option); - opt.negotiate(in,out); - return; - - default: - - complain(LOG_DEBUG, CH_TELNET, "Got unknown action code " - + lookupEscape(actionCode) - + "("+TelnetOption.getName(option) - + ")"); - break; + + } finally { + + escapeLock.release(); + complain(LOG_DEBUG, CH_TELNET, "escape: released the lock"); } } @@ -565,15 +602,14 @@ out.flush(); } - /** - * Blocking-read bytes from the socket, and handle incoming/outbound - * escapes. + /** + * */ - protected void execute() { + protected void execute() throws InterruptedException, Throwable { complain(LOG_DEBUG, CH_TELNET, "started"); - while ( isEnabled() && alive ) { + while ( isEnabled() ) { int aByte = -1; @@ -584,7 +620,7 @@ } catch ( IOException ioex ) { complain(LOG_DEBUG, CH_TELNET, "Reading socket:", ioex); - stop(); + throw ioex; } try { @@ -596,11 +632,10 @@ } else { - // complain(LOG_DEBUG, CH_TELNET, aByte+":"+(char)aByte); + //complain(LOG_DEBUG, CH_TELNET, "incoming: " + aByte + ":" + (char)aByte); if ( aByte == -1 ) { - alive = false; throw new IOException("EOF?"); } else { @@ -613,8 +648,8 @@ out.write(aByte); } - inBuffer.put(aByte); - notify_fake(); + inBuffer.write(aByte); +// notify_fake(); } } @@ -659,5 +694,144 @@ public Socket getSocket() { throw new Error("Use getInputStream() or getOutputStream() instead"); + } + + /** + * Get the input stream. + * + * User can read the data directly from this stream with no regard to the protocol escapes. + * + * @return The clean readable input stream. + * + * @exception IllegalStateException if the service is not ready yet. + */ + public InputStream getInputStream() throws IOException { + + if ( !isReady() ) { + throw new IllegalStateException("Not Ready"); + } + + return inSource; + } + + /** + * Get the output stream. + * + * User can write the data directly into this stream with no regard to the protocol escapes. + * + * @return The writable output stream. + * + * @exception IllegalStateException if the service is not ready yet. + */ + public OutputStream getOutputStream() throws IOException { + + if ( !isReady() ) { + throw new IllegalStateException("Not Ready"); + } + + return outBuffer; + } + + protected class Reader extends ActiveService { + + public static final String CH_TR = "TelnetReader"; + + protected InputStream in; + protected OutputStream out; + + Reader(InputStream in, OutputStream out) { + + this.in = in; + this.out = out; + } + + protected void startup() { + } + + /** + * Read the data from the {@link #outBuffer outBuffer}, acquire the + * {@link #escapeLock lock}, write the data to the socket output + * stream, release the lock, repeat while enabled. + */ + protected void execute() throws InterruptedException { + + while ( !this.isEnabled() ) { + + flush(); + } + } + + protected void shutdown() throws InterruptedException { + + flush(); + } + + /** + * Transfer the data from the input stream to the output stream + * until the input stream data is <code>available()</code>. + * + * Simplistic implementation at the moment. + */ + protected void flush() throws InterruptedException { + + try { + + // Blocking read here + + complain(LOG_DEBUG, CH_TR, "reading single..."); + int c = outSource.read(); + + if ( c == -1 ) { + + throw new IOException("EOF?"); + } + + // Now that I have the character, I have to get the lock + + complain(LOG_DEBUG, CH_TELNET, "reader: waiting for the lock"); + escapeLock.waitFor(); + complain(LOG_DEBUG, CH_TELNET, "reader: got the lock"); + + // and I can write that character in the output stream + + out.write(c); + complain(LOG_DEBUG, CH_TELNET, "outgoing: "+ c + ":" + (char)c); + + // as well as all the other characters remaining in the buffer + + while ( outSource.available() > 0 ) { + + // which are not going to block the operation, because + // available is not zero + + complain(LOG_DEBUG, CH_TR, "reading multiple (" + outSource.available() + " left)"); + c = outSource.read(); + + // until the end of file + + if ( c == -1 ) { + + throw new IOException("EOF?"); + } + + out.write(c); + complain(LOG_DEBUG, CH_TELNET, "outgoing: "+ c + ":" + (char)c); + } + + out.flush(); + + } catch ( IOException ioex ) { + + complain(LOG_ERR, "Telnet.Reader", "pipe:", ioex); + stop(); + + } finally { + + // and don't forget to release the lock. + + escapeLock.release(); + complain(LOG_DEBUG, CH_TELNET, "reader: released the lock"); + } + } } } |
From: CVS B. <vt...@fr...> - 2000-11-02 08:38:28
|
User: vt Date: 00/11/01 23:57:47 Modified: src/java/gnu/j4/net/telnet Telnet.java Log: Checkpoint on the way to implement a shutdown(Throwable failureCause) call, as opposed to no-argument shutdown(). Point is, I want to know why the execute() died, if it did. Revision Changes Path 1.7 +9 -4 J4/src/java/gnu/j4/net/telnet/Telnet.java CVSWEB Options: ------------------- CVSWeb: Annotate this file: http://cvs.sourceforge.net/cgi-bin/cvsweb.cgi/J4/src/java/gnu/j4/net/telnet/Telnet.java?annotate=1.7&cvsroot=jukebox4 CVSWeb: View this file: http://cvs.sourceforge.net/cgi-bin/cvsweb.cgi/J4/src/java/gnu/j4/net/telnet/Telnet.java?rev=1.7&content-type=text/x-cvsweb-markup&cvsroot=jukebox4 CVSWeb: Diff to previous version: http://cvs.sourceforge.net/cgi-bin/cvsweb.cgi/J4/src/java/gnu/j4/net/telnet/Telnet.java.diff?r1=1.7&r2=1.6&cvsroot=jukebox4 ----------------------------------- Index: Telnet.java =================================================================== RCS file: /usr/local/cvs/J4/src/java/gnu/j4/net/telnet/Telnet.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- Telnet.java 2000/05/05 02:51:58 1.6 +++ Telnet.java 2000/11/02 06:57:47 1.7 @@ -37,7 +37,7 @@ * 854</a> for details. * * @author Copyright © <a href="mailto:vt...@fr...">Vadim Tkachenko</a> 1995, 2000 - * @version $Id: Telnet.java,v 1.6 2000/05/05 02:51:58 vt Exp $ + * @version $Id: Telnet.java,v 1.7 2000/11/02 06:57:47 vt Exp $ */ public class Telnet extends SocketService implements TelnetConstants, TelnetOptionConstants { @@ -261,10 +261,15 @@ * Don't kill the sockets, as the parent does, in case when this * service is a dependant. */ - protected void shutdown() throws InterruptedException, Throwable { + protected void shutdown(Throwable failureCause) throws InterruptedException, Throwable { complain(LOG_DEBUG, CH_TELNET, "Shutting down"); + if ( failureCause != null ) { + + complain(LOG_ERR, CH_TELNET, "FIXME: execute() failure cause not processed:", failureCause); + } + if ( reader.isEnabled() ) { complain(LOG_DEBUG, CH_TELNET, "Stopping reader"); reader.stop().waitFor(); @@ -275,7 +280,7 @@ // if ( dependsOn == null && alive ) { - super.shutdown(); + super.shutdown(failureCause); // } } @@ -774,7 +779,7 @@ } } - protected void shutdown() throws InterruptedException { + protected void shutdown(Throwable failureCause) throws InterruptedException { complain(LOG_DEBUG, CH_TR, "Shutting down"); flush(); |