[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/transport/tcp TCPSession.java,1.18,
Status: Beta
Brought to you by:
huston
From: Huston F. <hu...@us...> - 2001-11-27 07:31:24
|
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/transport/tcp In directory usw-pr-cvs1:/tmp/cvs-serv9482/src/org/beepcore/beep/transport/tcp Modified Files: TCPSession.java Log Message: reduced the number of calls to read() Index: TCPSession.java =================================================================== RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/transport/tcp/TCPSession.java,v retrieving revision 1.18 retrieving revision 1.19 diff -C2 -r1.18 -r1.19 *** TCPSession.java 2001/11/27 02:39:03 1.18 --- TCPSession.java 2001/11/27 07:31:21 1.19 *************** *** 66,69 **** --- 66,77 ---- private static final String TCP_MAPPING = "TCP Mapping"; private static final String CRLF = "\r\n"; + private static final int MIN_SEQ_HEADER_SIZE = (3 // msg type + + 1 // space + + 1 // channel number + + 1 // space + + 1 // acknum + + 1 // space + + 1 // window + + CRLF.length()); private static final int CHANNEL_START_ODD = 1; *************** *** 227,231 **** } ! thread = new Thread(new SessionThread(), threadName); thread.setDaemon(true); --- 235,244 ---- } ! thread = new Thread(threadName) ! { ! public void run() { ! processNextFrame(); ! } ! }; thread.setDaemon(true); *************** *** 411,452 **** } - // Lame hack for J++ - private boolean modState(int i) throws BEEPException - { - return super.changeState(i); - } - - /** - * This specialization is designed to process Frames unique - * to the TCP mapping (e.g. SEQ frames). If the Frame is - * not an SEQ frame, it calls the regular TCPSession - * processNextFrame( int ) with an integer that simply - * indicates how far it's read ahead into the stream. - * @exception Throws an exception if it encounters a poorly - * formed header or an IOException in the underlying socket - * stream. - * - * @throws BEEPException - */ private void processNextFrame() - throws BEEPException, IOException, SessionAbortedException { ! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) { ! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING, ! "Processing next frame"); } ! int length = 0; ! InputStream is = socket.getInputStream(); while (true) { try { ! int b = is.read(); ! if (b == -1) { throw new SessionAbortedException(); } ! headerBuffer[length] = (byte) b; } catch (java.net.SocketException e) { --- 424,517 ---- } private void processNextFrame() { ! running = true; ! ! try { ! InputStream is = socket.getInputStream(); ! ! while (running) { ! if (getState() == SESSION_STATE_CLOSING || ! getState() == SESSION_STATE_TERMINATING || ! getState() == SESSION_STATE_CLOSED) ! { ! break; ! } ! ! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) { ! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING, ! "Processing next frame"); ! } ! ! int amountRead; ! ! try { ! do { ! amountRead = ! is.read(headerBuffer, 0, MIN_SEQ_HEADER_SIZE); ! ! if (amountRead == -1) { ! throw new SessionAbortedException(); ! } ! } while (amountRead == 0); ! ! } catch (java.net.SocketException e) { ! if (getState() == SESSION_STATE_ACTIVE) { ! throw e; ! } ! ! // socket closed intentionally (session closing) ! // so just return ! return; ! } ! ! if (headerBuffer[0] == (byte) SEQ_PREFIX.charAt(0)) { ! processSEQFrame(headerBuffer, amountRead, is); ! continue; ! } else { ! processCoreFrame(headerBuffer, amountRead, is); ! } ! } ! } catch (IOException e) { ! Log.logEntry(Log.SEV_ERROR, TCP_MAPPING, e); ! ! socket = null; ! ! terminate(e.getMessage()); ! } catch (SessionAbortedException e) { ! terminate("Session aborted by remote peer."); ! } catch (Throwable e) { ! Log.logEntry(Log.SEV_ERROR, TCP_MAPPING, e); ! terminate(e.getMessage()); } ! Log.logEntry(Log.SEV_DEBUG, TCP_MAPPING, ! "Session listener thread exiting. State = " ! + TCPSession.this.getState()); ! } + private void processCoreFrame(byte[] headerBuffer, int amountRead, + InputStream is) + throws SessionAbortedException, BEEPException, IOException + { + int headerLength = 0; + int amountToRead = Frame.MIN_FRAME_SIZE - amountRead; + + headerFound: while (true) { + int tokenCount = 6; + try { ! int n = is.read(headerBuffer, amountRead, amountToRead); ! if (n == -1) { throw new SessionAbortedException(); } + + if (n == 0) { + continue; + } ! amountRead += n; } catch (java.net.SocketException e) { *************** *** 455,526 **** } ! // socket closed intentionally (session closing) so just return return; } ! if (headerBuffer[length] == '\n') { ! if ((length == 0) || (headerBuffer[length - 1] != '\r')) { ! throw new BEEPException("Malformed BEEP header"); } ! --length; ! break; } - - ++length; ! if (length == Frame.MAX_HEADER_SIZE) { throw new BEEPException("Malformed BEEP header, no CRLF"); } } ! if (Log.isLogged(Log.SEV_DEBUG)) { ! Log.logEntry(Log.SEV_DEBUG, TCP_MAPPING, ! "Processing: " ! + new String(headerBuffer, 0, length)); } ! // If this is not a SEQ frame build a <code>Frame</code> and ! // read in the payload and verify the TRAILER. ! if (headerBuffer[0] != (byte) SEQ_PREFIX.charAt(0)) { ! Frame f = super.createFrame(headerBuffer, length); ! byte[] payload = new byte[f.getSize()]; ! for (int count = 0; count < payload.length; ) { ! int n = is.read(payload, count, payload.length - count); ! if (n == -1) { ! throw new SessionAbortedException(); } ! count += n; } ! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) { ! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING, ! new String(payload)); } ! for (int i = 0; i < Frame.TRAILER.length(); ++i) { ! int b = is.read(); ! if (b == -1) { throw new SessionAbortedException(); } ! if (((byte) b) != ((byte) Frame.TRAILER.charAt(i))) { ! throw new BEEPException("Malformed BEEP frame, " ! + "invalid trailer"); } - } ! f.addPayload(new BufferSegment(payload)); ! super.postFrame(f); ! return; } // Process the header StringTokenizer st = new StringTokenizer(new String(headerBuffer, 0, ! length)); if (st.countTokens() != 4) { --- 520,670 ---- } ! // socket closed intentionally (session closing) ! // so just return ! running = false; return; } ! while (headerLength < amountRead) { ! if (headerBuffer[headerLength] == '\n') { ! if (headerLength == 0 || ! headerBuffer[headerLength - 1] != '\r') ! { ! throw new BEEPException("Malformed BEEP header"); ! } ! ! ++headerLength; ! break headerFound; } ! if (headerBuffer[headerLength] == ' ') { ! if (tokenCount > 1) { // This is for ANS frames ! --tokenCount; ! } ! } ! ++headerLength; } ! if (headerLength > Frame.MAX_HEADER_SIZE) { throw new BEEPException("Malformed BEEP header, no CRLF"); } + + /* 2 = 1 for the min token size and 1 is for the separator ' ' + * or "\r\n" + */ + amountToRead = (tokenCount * 2) + Frame.TRAILER.length(); } ! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) { ! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING, ! new String(headerBuffer, 0, headerLength)); } ! Frame f = super.createFrame(headerBuffer, ! headerLength - CRLF.length()); ! byte[] payload = new byte[f.getSize()]; ! int count = amountRead - headerLength; ! System.arraycopy(headerBuffer, headerLength, payload, 0, count); ! ! while (count < payload.length) { ! int n = is.read(payload, count, payload.length - count); ! if (n == -1) { ! throw new SessionAbortedException(); ! } ! count += n; ! } ! ! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) { ! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING, ! new String(payload)); ! } ! ! for (int i = 0; i < Frame.TRAILER.length(); ++i) { ! int b = is.read(); ! ! if (b == -1) { ! throw new SessionAbortedException(); ! } ! ! if (((byte) b) != ((byte) Frame.TRAILER.charAt(i))) { ! throw new BEEPException("Malformed BEEP frame, " ! + "invalid trailer"); ! } ! } ! ! f.addPayload(new BufferSegment(payload)); ! super.postFrame(f); ! } ! ! private void processSEQFrame(byte[] headerBuffer, int amountRead, ! InputStream is) ! throws BEEPException, IOException, SessionAbortedException ! { ! int headerLength = 0; ! int tokenCount = 4; ! ! headerFound: ! while (true) { ! ! while (headerLength < amountRead) { ! if (headerBuffer[headerLength] == '\n') { ! if (headerLength == 0 || ! headerBuffer[headerLength - 1] != '\r') ! { ! throw new BEEPException("Malformed BEEP header"); ! } ! ! ++headerLength; ! break headerFound; } ! ! if (headerBuffer[headerLength] == ' ') { ! if (tokenCount > 1) { ! --tokenCount; ! } ! } ! ! ++headerLength; } ! if (headerLength > Frame.MAX_HEADER_SIZE) { ! throw new BEEPException("Malformed BEEP header, no CRLF"); } ! int amountToRead = headerBuffer[headerLength - 1] == '\r' ? 1 : ! tokenCount * 2; ! try { ! /* 2 = 1 for the min token size and 1 is for the separator ' ' ! * or "\r\n" ! */ ! int n = is.read(headerBuffer, amountRead, amountToRead); ! if (n == -1) { throw new SessionAbortedException(); } ! if (n == 0) { ! continue; } ! amountRead += n; ! ! } catch (java.net.SocketException e) { ! if (getState() == SESSION_STATE_ACTIVE) { ! throw e; ! } ! // socket closed intentionally (session closing) ! // so just return ! running = false; ! return; ! } } // Process the header StringTokenizer st = new StringTokenizer(new String(headerBuffer, 0, ! headerLength)); if (st.countTokens() != 4) { *************** *** 549,588 **** this.updatePeerReceiveBufferSize(channelNum, ackNum, window); } ! ! private class SessionThread implements Runnable { ! ! public void run() ! { ! try { ! running = true; ! ! // Listen for and post the Greeting Frame ! while ((getState() != SESSION_STATE_ACTIVE) && running) { ! processNextFrame(); ! } ! ! // Keep processing frames as long as we're active ! while ((getState() == SESSION_STATE_ACTIVE) && running) { ! processNextFrame(); ! } ! } catch (IOException e) { ! Log.logEntry(Log.SEV_ERROR, TCP_MAPPING, e); ! ! socket = null; ! ! terminate(e.getMessage()); ! } catch (SessionAbortedException e) { ! terminate("Session aborted by remote peer."); ! } catch (Throwable e) { ! Log.logEntry(Log.SEV_ERROR, TCP_MAPPING, e); ! terminate(e.getMessage()); ! } ! ! Log.logEntry(Log.SEV_DEBUG, TCP_MAPPING, ! "Session listener thread exiting. State = " ! + TCPSession.this.getState()); ! } ! } ! private static class SessionAbortedException extends Exception { } --- 693,697 ---- this.updatePeerReceiveBufferSize(channelNum, ackNum, window); } ! private static class SessionAbortedException extends Exception { } |