[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/transport/tcp TCPSession.java,1.18,
Status: Beta
Brought to you by:
huston
|
From: Huston F. <hu...@us...> - 2001-11-27 07:31:24
|
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/transport/tcp
In directory usw-pr-cvs1:/tmp/cvs-serv9482/src/org/beepcore/beep/transport/tcp
Modified Files:
TCPSession.java
Log Message:
reduced the number of calls to read()
Index: TCPSession.java
===================================================================
RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/transport/tcp/TCPSession.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -C2 -r1.18 -r1.19
*** TCPSession.java 2001/11/27 02:39:03 1.18
--- TCPSession.java 2001/11/27 07:31:21 1.19
***************
*** 66,69 ****
--- 66,77 ----
private static final String TCP_MAPPING = "TCP Mapping";
private static final String CRLF = "\r\n";
+ private static final int MIN_SEQ_HEADER_SIZE = (3 // msg type
+ + 1 // space
+ + 1 // channel number
+ + 1 // space
+ + 1 // acknum
+ + 1 // space
+ + 1 // window
+ + CRLF.length());
private static final int CHANNEL_START_ODD = 1;
***************
*** 227,231 ****
}
! thread = new Thread(new SessionThread(), threadName);
thread.setDaemon(true);
--- 235,244 ----
}
! thread = new Thread(threadName)
! {
! public void run() {
! processNextFrame();
! }
! };
thread.setDaemon(true);
***************
*** 411,452 ****
}
- // Lame hack for J++
- private boolean modState(int i) throws BEEPException
- {
- return super.changeState(i);
- }
-
- /**
- * This specialization is designed to process Frames unique
- * to the TCP mapping (e.g. SEQ frames). If the Frame is
- * not an SEQ frame, it calls the regular TCPSession
- * processNextFrame( int ) with an integer that simply
- * indicates how far it's read ahead into the stream.
- * @exception Throws an exception if it encounters a poorly
- * formed header or an IOException in the underlying socket
- * stream.
- *
- * @throws BEEPException
- */
private void processNextFrame()
- throws BEEPException, IOException, SessionAbortedException
{
! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) {
! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING,
! "Processing next frame");
}
! int length = 0;
! InputStream is = socket.getInputStream();
while (true) {
try {
! int b = is.read();
! if (b == -1) {
throw new SessionAbortedException();
}
! headerBuffer[length] = (byte) b;
} catch (java.net.SocketException e) {
--- 424,517 ----
}
private void processNextFrame()
{
! running = true;
!
! try {
! InputStream is = socket.getInputStream();
!
! while (running) {
! if (getState() == SESSION_STATE_CLOSING ||
! getState() == SESSION_STATE_TERMINATING ||
! getState() == SESSION_STATE_CLOSED)
! {
! break;
! }
!
! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) {
! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING,
! "Processing next frame");
! }
!
! int amountRead;
!
! try {
! do {
! amountRead =
! is.read(headerBuffer, 0, MIN_SEQ_HEADER_SIZE);
!
! if (amountRead == -1) {
! throw new SessionAbortedException();
! }
! } while (amountRead == 0);
!
! } catch (java.net.SocketException e) {
! if (getState() == SESSION_STATE_ACTIVE) {
! throw e;
! }
!
! // socket closed intentionally (session closing)
! // so just return
! return;
! }
!
! if (headerBuffer[0] == (byte) SEQ_PREFIX.charAt(0)) {
! processSEQFrame(headerBuffer, amountRead, is);
! continue;
! } else {
! processCoreFrame(headerBuffer, amountRead, is);
! }
! }
! } catch (IOException e) {
! Log.logEntry(Log.SEV_ERROR, TCP_MAPPING, e);
!
! socket = null;
!
! terminate(e.getMessage());
! } catch (SessionAbortedException e) {
! terminate("Session aborted by remote peer.");
! } catch (Throwable e) {
! Log.logEntry(Log.SEV_ERROR, TCP_MAPPING, e);
! terminate(e.getMessage());
}
! Log.logEntry(Log.SEV_DEBUG, TCP_MAPPING,
! "Session listener thread exiting. State = "
! + TCPSession.this.getState());
! }
+ private void processCoreFrame(byte[] headerBuffer, int amountRead,
+ InputStream is)
+ throws SessionAbortedException, BEEPException, IOException
+ {
+ int headerLength = 0;
+ int amountToRead = Frame.MIN_FRAME_SIZE - amountRead;
+
+ headerFound:
while (true) {
+ int tokenCount = 6;
+
try {
! int n = is.read(headerBuffer, amountRead, amountToRead);
! if (n == -1) {
throw new SessionAbortedException();
}
+
+ if (n == 0) {
+ continue;
+ }
! amountRead += n;
} catch (java.net.SocketException e) {
***************
*** 455,526 ****
}
! // socket closed intentionally (session closing) so just return
return;
}
! if (headerBuffer[length] == '\n') {
! if ((length == 0) || (headerBuffer[length - 1] != '\r')) {
! throw new BEEPException("Malformed BEEP header");
}
! --length;
! break;
}
-
- ++length;
! if (length == Frame.MAX_HEADER_SIZE) {
throw new BEEPException("Malformed BEEP header, no CRLF");
}
}
! if (Log.isLogged(Log.SEV_DEBUG)) {
! Log.logEntry(Log.SEV_DEBUG, TCP_MAPPING,
! "Processing: "
! + new String(headerBuffer, 0, length));
}
! // If this is not a SEQ frame build a <code>Frame</code> and
! // read in the payload and verify the TRAILER.
! if (headerBuffer[0] != (byte) SEQ_PREFIX.charAt(0)) {
! Frame f = super.createFrame(headerBuffer, length);
! byte[] payload = new byte[f.getSize()];
! for (int count = 0; count < payload.length; ) {
! int n = is.read(payload, count, payload.length - count);
! if (n == -1) {
! throw new SessionAbortedException();
}
! count += n;
}
! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) {
! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING,
! new String(payload));
}
! for (int i = 0; i < Frame.TRAILER.length(); ++i) {
! int b = is.read();
! if (b == -1) {
throw new SessionAbortedException();
}
! if (((byte) b) != ((byte) Frame.TRAILER.charAt(i))) {
! throw new BEEPException("Malformed BEEP frame, "
! + "invalid trailer");
}
- }
! f.addPayload(new BufferSegment(payload));
! super.postFrame(f);
! return;
}
// Process the header
StringTokenizer st = new StringTokenizer(new String(headerBuffer, 0,
! length));
if (st.countTokens() != 4) {
--- 520,670 ----
}
! // socket closed intentionally (session closing)
! // so just return
! running = false;
return;
}
! while (headerLength < amountRead) {
! if (headerBuffer[headerLength] == '\n') {
! if (headerLength == 0 ||
! headerBuffer[headerLength - 1] != '\r')
! {
! throw new BEEPException("Malformed BEEP header");
! }
!
! ++headerLength;
! break headerFound;
}
! if (headerBuffer[headerLength] == ' ') {
! if (tokenCount > 1) { // This is for ANS frames
! --tokenCount;
! }
! }
! ++headerLength;
}
! if (headerLength > Frame.MAX_HEADER_SIZE) {
throw new BEEPException("Malformed BEEP header, no CRLF");
}
+
+ /* 2 = 1 for the min token size and 1 is for the separator ' '
+ * or "\r\n"
+ */
+ amountToRead = (tokenCount * 2) + Frame.TRAILER.length();
}
! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) {
! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING,
! new String(headerBuffer, 0, headerLength));
}
! Frame f = super.createFrame(headerBuffer,
! headerLength - CRLF.length());
! byte[] payload = new byte[f.getSize()];
! int count = amountRead - headerLength;
! System.arraycopy(headerBuffer, headerLength, payload, 0, count);
!
! while (count < payload.length) {
! int n = is.read(payload, count, payload.length - count);
! if (n == -1) {
! throw new SessionAbortedException();
! }
! count += n;
! }
!
! if (Log.isLogged(Log.SEV_DEBUG_VERBOSE)) {
! Log.logEntry(Log.SEV_DEBUG_VERBOSE, TCP_MAPPING,
! new String(payload));
! }
!
! for (int i = 0; i < Frame.TRAILER.length(); ++i) {
! int b = is.read();
!
! if (b == -1) {
! throw new SessionAbortedException();
! }
!
! if (((byte) b) != ((byte) Frame.TRAILER.charAt(i))) {
! throw new BEEPException("Malformed BEEP frame, "
! + "invalid trailer");
! }
! }
!
! f.addPayload(new BufferSegment(payload));
! super.postFrame(f);
! }
!
! private void processSEQFrame(byte[] headerBuffer, int amountRead,
! InputStream is)
! throws BEEPException, IOException, SessionAbortedException
! {
! int headerLength = 0;
! int tokenCount = 4;
!
! headerFound:
! while (true) {
!
! while (headerLength < amountRead) {
! if (headerBuffer[headerLength] == '\n') {
! if (headerLength == 0 ||
! headerBuffer[headerLength - 1] != '\r')
! {
! throw new BEEPException("Malformed BEEP header");
! }
!
! ++headerLength;
! break headerFound;
}
!
! if (headerBuffer[headerLength] == ' ') {
! if (tokenCount > 1) {
! --tokenCount;
! }
! }
!
! ++headerLength;
}
! if (headerLength > Frame.MAX_HEADER_SIZE) {
! throw new BEEPException("Malformed BEEP header, no CRLF");
}
! int amountToRead = headerBuffer[headerLength - 1] == '\r' ? 1 :
! tokenCount * 2;
! try {
! /* 2 = 1 for the min token size and 1 is for the separator ' '
! * or "\r\n"
! */
! int n = is.read(headerBuffer, amountRead, amountToRead);
! if (n == -1) {
throw new SessionAbortedException();
}
! if (n == 0) {
! continue;
}
! amountRead += n;
!
! } catch (java.net.SocketException e) {
! if (getState() == SESSION_STATE_ACTIVE) {
! throw e;
! }
! // socket closed intentionally (session closing)
! // so just return
! running = false;
! return;
! }
}
// Process the header
StringTokenizer st = new StringTokenizer(new String(headerBuffer, 0,
! headerLength));
if (st.countTokens() != 4) {
***************
*** 549,588 ****
this.updatePeerReceiveBufferSize(channelNum, ackNum, window);
}
!
! private class SessionThread implements Runnable {
!
! public void run()
! {
! try {
! running = true;
!
! // Listen for and post the Greeting Frame
! while ((getState() != SESSION_STATE_ACTIVE) && running) {
! processNextFrame();
! }
!
! // Keep processing frames as long as we're active
! while ((getState() == SESSION_STATE_ACTIVE) && running) {
! processNextFrame();
! }
! } catch (IOException e) {
! Log.logEntry(Log.SEV_ERROR, TCP_MAPPING, e);
!
! socket = null;
!
! terminate(e.getMessage());
! } catch (SessionAbortedException e) {
! terminate("Session aborted by remote peer.");
! } catch (Throwable e) {
! Log.logEntry(Log.SEV_ERROR, TCP_MAPPING, e);
! terminate(e.getMessage());
! }
!
! Log.logEntry(Log.SEV_DEBUG, TCP_MAPPING,
! "Session listener thread exiting. State = "
! + TCPSession.this.getState());
! }
! }
!
private static class SessionAbortedException extends Exception {
}
--- 693,697 ----
this.updatePeerReceiveBufferSize(channelNum, ackNum, window);
}
!
private static class SessionAbortedException extends Exception {
}
|