[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/core InputDataStream.java,NONE,1.1
Status: Beta
Brought to you by:
huston
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core In directory usw-pr-cvs1:/tmp/cvs-serv24137/src/org/beepcore/beep/core Modified Files: ByteDataStream.java Channel.java Frame.java Message.java MessageListener.java MessageMSG.java MessageStatus.java ProfileRegistry.java Session.java StringDataStream.java TuningProfile.java Added Files: InputDataStream.java InputDataStreamAdapter.java MimeHeaders.java OutputDataStream.java Log Message: New *DataStream --- NEW FILE: InputDataStream.java --- /* * InputDataStream.java $Revision: 1.1 $ $Date: 2001/10/31 00:32:37 $ * * Copyright (c) 2001 Huston Franklin. All rights reserved. * * The contents of this file are subject to the Blocks Public License (the * "License"); You may not use this file except in compliance with the License. * * You may obtain a copy of the License at http://www.invisible.net/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * */ package org.beepcore.beep.core; import java.util.Enumeration; import java.util.Iterator; import java.util.LinkedList; import org.beepcore.beep.util.BufferSegment; /** * <code>InputDataStream</code> holds a stream of * <code>BufferSegments</code>(s) and provides accessor methods to * that stream. * <p> * <b>Note that this implementation is not synchronized.</b> If * multiple threads access a <code>InputDataStream</code> * concurrently, data may be inconsistent or lost. * * @see org.beepcore.beep.util.BufferSegment * * @author Huston Franklin * @version $Revision: 1.1 $, $Date: 2001/10/31 00:32:37 $ */ public class InputDataStream { /** * Creates a <code>InputDataStream</code>. */ InputDataStream() { } /** * Creates a <code>InputDataStream</code>. * For use with in <code>Channel</code>. * * @param channel Is notified as BufferSegments are read this allows the * Channel to update the receive window. */ InputDataStream(Channel channel) { this.channel = channel; } /** * Creates a <code>InputDataStream</code>. * * @param buf The first buffer in the data stream. */ InputDataStream(BufferSegment buf) { this.add(buf); } /** * Creates a <code>InputDataStream</code>. * * @param buf The first buffer in the data stream. */ InputDataStream(BufferSegment buf, boolean complete) { this(buf); if (complete) { this.setComplete(); } } void add(BufferSegment segment) { if( this.closed) { if (this.channel != null) { this.channel.freeReceiveBufferBytes(segment.getLength()); } return; } synchronized (this.buffers) { this.buffers.addLast(segment); this.buffers.notify(); } this.availableBytes += segment.getLength(); } public int available() { return this.availableBytes; // int bytesAvailable = 0; // synchronized (this.buffers) { // Iterator i = this.buffers.iterator(); // while (i.hasNext()) { // bytesAvailable += ((BufferSegment) i.next()).getLength(); // } // } // return bytesAvailable; } /** * @todo doc */ public boolean availableSegment() { return (this.buffers.isEmpty() == false); } /** * @todo doc */ public void close() { this.closed = true; while (this.availableSegment()) { this.getNextSegment(); } } /** * @todo doc */ public InputDataStreamAdapter getInputStream() { if (stream == null) { stream = new InputDataStreamAdapter(this); } return stream; } /** * @todo doc */ public BufferSegment getNextSegment() { BufferSegment b; synchronized (buffers) { b = (BufferSegment) buffers.removeFirst(); } if (this.channel != null) { this.channel.freeReceiveBufferBytes(b.getLength()); } this.availableBytes -= b.getLength(); return b; } public BufferSegment waitForNextSegment() throws InterruptedException { synchronized (buffers) { if (availableSegment() == false) { buffers.wait(); } return getNextSegment(); } } public boolean isClosed() { return closed; } /** * Returns <code>true</code> if no more bytes will be added to * those currently available on this stream. Returns * <code>false</code> if more bytes are expected. */ public boolean isComplete() { return this.complete; } void setComplete() { this.complete = true; // @todo notify objects waiting for more buffers. } LinkedList buffers = new LinkedList(); private int availableBytes = 0; private Channel channel = null; private boolean closed = false; private boolean complete = false; private InputDataStreamAdapter stream = null; } /** * Returns the content type of a <code>InputDataStrea</code>. If * the <code>BufferSegment</code> containing the content type * hasn't been received yet, the method blocks until it is * received. * * @return Content type. * * @throws BEEPException * @deprecated */ // public String getContentType() throws BEEPException // { // return this.getInputStream().getContentType(); // } /** * Returns the value of the MIME entity header which corresponds * to the given <code>name</code>. If the <code>BufferSegment</code> * containing the content type hasn't been received yet, the * method blocks until it is received. * * @param name Name of the entity header. * @return String Value of the entity header. * * @throws BEEPException * @deprecated */ // public String getHeaderValue(String name) throws BEEPException // { // return this.getInputStream().getHeaderValue(name); // } /** * Returns an <code>Enumeration</code> of all the MIME entity * header names belonging to this <code>InputDataStream</code>. * If the <code>BufferSegment</code> containing the content type * hasn't been received yet, the method blocks until it is * received. * * @throws BEEPException * @deprecated */ // public Enumeration getHeaderNames() throws BEEPException // { // return this.getInputStream().getHeaderNames(); // } /** * Returns the transfer encoding of a <code>InputDataStrea</code>. * If the <code>BufferSegment</code> containing the content type * hasn't been received yet, the method blocks until it is * received. * * @throws BEEPException * @deprecated */ // public String getTransferEncoding() throws BEEPException // { // return this.getInputStream().getTransferEncoding(); // } --- NEW FILE: InputDataStreamAdapter.java --- package org.beepcore.beep.core; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Enumeration; import java.util.Hashtable; import org.beepcore.beep.util.BufferSegment; import org.beepcore.beep.util.Log; public class InputDataStreamAdapter extends java.io.InputStream { InputDataStreamAdapter(InputDataStream ids) { this.ids = ids; } public int available() { if (this.state != STATE_HEADERS_PARSED) { parseHeaders(); if (this.state != STATE_HEADERS_PARSED) { return 0; } } if (this.pos == this.curBuf.getLength()) { setNextBuffer(); } return (this.curBuf.getLength() - this.pos) + this.ids.available(); } public void close() { ids.close(); } /** * Returns the content type of a <code>FrameDataStrea</code>. If the * <code>Frame</code> containing the content type hasn't been received yet, * the method blocks until it is received. * * @return Content type. * * @throws BEEPException */ public String getContentType() throws BEEPException { return this.getHeaderValue(MimeHeaders.CONTENT_TYPE); } /** * Returns the value of the MIME entity header which corresponds * to the given <code>name</code>. If the <code>Frame</code> * containing the content type hasn't been received yet, the * method blocks until it is received. * * @param name Name of the entity header. * @return String Value of the entity header. * * @throws BEEPException */ public String getHeaderValue(String name) throws BEEPException { waitAvailable(); return (String) this.mimeHeaders.get(name); } /** * Returns an <code>Enumeration</code> of all the MIME entity header names * belonging to this <code>FrameDataStream</code>. If the * <code>Frame</code> containing the content type hasn't been received yet, * the method blocks until it is received. * * @throws BEEPException */ public Enumeration getHeaderNames() throws BEEPException { waitAvailable(); return this.mimeHeaders.keys(); } /** * Returns the transfer encoding of a <code>FrameDataStrea</code>. If the * <code>Frame</code> containing the content type hasn't been received yet, * the method blocks until it is received. * * @return Content type. * * @throws BEEPException */ public String getTransferEncoding() throws BEEPException { return this.getHeaderValue(MimeHeaders.CONTENT_TRANSFER_ENCODING); } public int read() throws IOException { if (waitAvailable() == -1) { return -1; } return internalRead(); } public int read(byte[] b) throws IOException { return read(b, 0, b.length); } public int read(byte[] b, int off, int len) throws IOException { if (off == -1 || len == -1 || off + len > b.length) throw new IndexOutOfBoundsException(); int bytesRead = 0; while (bytesRead < len) { if (waitAvailable() == -1) { if (bytesRead == 0) { return -1; } else { break; } } int n = internalRead(b, off + bytesRead, len - bytesRead); if (n == 0 && ids.isClosed()) { return bytesRead != 0 ? bytesRead : -1; } bytesRead += n; } return bytesRead; } public long skip(long n) throws IOException { if (n > Integer.MAX_VALUE) { return 0; } int bytesSkipped = 0; while (bytesSkipped < n && ids.isClosed() == false) { if (waitAvailable() == -1) { break; } int i = Math.min(((int) n) - bytesSkipped, curBuf.getLength() - pos); pos += i; bytesSkipped += i; } return bytesSkipped; } /** * This version of read() does not block if there are no bytes * available. */ private int internalRead() { if (setNextBuffer() == false) { return -1; } int b = curBuf.getData()[curBuf.getOffset() + pos] & 0xff; pos++; return b; } /** * This version of read(byte[], int, int) does not block if * there are no bytes available. */ private int internalRead(byte[] b, int off, int len) { int bytesRead = 0; while (bytesRead < len) { if (setNextBuffer() == false) { break; } int n = Math.min(len - bytesRead, curBuf.getLength() - pos); System.arraycopy(curBuf.getData(), curBuf.getOffset() + pos, b, off + bytesRead, n); pos += n; bytesRead += n; } return bytesRead; } private void parseHeaders() { while (true) { switch (state) { case STATE_INIT: state = STATE_PARSING_NAME; break; case STATE_PARSING_NAME: parseName(); if (state == STATE_PARSING_NAME) { return; } break; case STATE_PARSING_NAME_TERMINATOR: parseNameTerminator(); if (state == STATE_PARSING_NAME_TERMINATOR) { return; } break; case STATE_PARSING_VALUE: parseValue(); if (state == STATE_PARSING_VALUE) { return; } break; case STATE_PARSING_VALUE_TERMINATOR: parseValueTerminator(); if (state == STATE_PARSING_VALUE_TERMINATOR) { return; } break; case STATE_PARSING_HEADERS_TERMINATOR: parseHeadersTerminator(); if (state == STATE_PARSING_HEADERS_TERMINATOR) { return; } break; case STATE_HEADERS_PARSED: return; } } } private void parseHeadersTerminator() { int b = internalRead(); // move off of the LF if (b == -1) { return; } if (this.mimeHeaders.get(MimeHeaders.CONTENT_TYPE) == null) { this.mimeHeaders.put(MimeHeaders.CONTENT_TYPE, MimeHeaders.DEFAULT_CONTENT_TYPE); } if (mimeHeaders.get(MimeHeaders.CONTENT_TRANSFER_ENCODING) == null) { mimeHeaders.put(MimeHeaders.CONTENT_TRANSFER_ENCODING, MimeHeaders.DEFAULT_CONTENT_TRANSFER_ENCODING); } state = STATE_HEADERS_PARSED; } private void parseName() { int b = internalRead(); if (b == -1) { return; } if (b == CR) { state = STATE_PARSING_HEADERS_TERMINATOR; return; } name.write((byte) b); // find the ':' while (true) { b = internalRead(); if (b == -1) { return; } if (b == COLON) { state = STATE_PARSING_NAME_TERMINATOR; return; } name.write((byte) b); } } private void parseNameTerminator() { int b = internalRead(); // we are pointing to ':' move off of ' ' if (b == -1) { return; } state = STATE_PARSING_VALUE; } private void parseValue() { while (true) { int b = internalRead(); if (b == -1) { return; } if (b == CR) { state = STATE_PARSING_VALUE_TERMINATOR; return; } value.write((byte) b); } } private void parseValueTerminator() { int b = internalRead(); // move off of LF if (b == -1) { return; } try { this.mimeHeaders.put(name.toString("UTF-8"), value.toString("UTF-8")); } catch (UnsupportedEncodingException e) { throw new RuntimeException("UTF-8 not supported"); } name.reset(); value.reset(); state = STATE_PARSING_NAME; } /** * If there are no bytes remaining in the current buffer move to * the next one if it exists. */ private boolean setNextBuffer() { while (pos == curBuf.getLength()) { if (ids.availableSegment() == false) { return false; } curBuf = ids.getNextSegment(); pos = 0; } return true; } /** * Wait until available() != 0 */ private int waitAvailable() { int n; if ((n = available()) > 0) { return n; } synchronized (ids.buffers) { while ((n = available()) == 0) { if (ids.isComplete() == true) { // no more bytes to read() and none are // expected, return -1 return -1; } // no bytes available to read, but more are // expected... block try { ids.buffers.wait(); } catch (InterruptedException e) { Log.logEntry(Log.SEV_ERROR, e); } } return n; } } private static final BufferSegment zeroLength = new BufferSegment(new byte[0]); private int pos = 0; private BufferSegment curBuf = zeroLength; private static final int LF = 10; private static final int CR = 13; private static final int COLON = 58; private static final int STATE_INIT = 0; private static final int STATE_PARSING_NAME = 1; private static final int STATE_PARSING_NAME_TERMINATOR = 2; private static final int STATE_PARSING_VALUE = 3; private static final int STATE_PARSING_VALUE_TERMINATOR = 4; private static final int STATE_PARSING_HEADERS_TERMINATOR = 5; private static final int STATE_HEADERS_PARSED = 6; private int state = STATE_INIT; private ByteArrayOutputStream name = new ByteArrayOutputStream(32); private ByteArrayOutputStream value = new ByteArrayOutputStream(32); private static final int DEFAULT_HEADER_TABLE_SIZE = 2; private Hashtable mimeHeaders = new Hashtable(DEFAULT_HEADER_TABLE_SIZE); private InputDataStream ids; } --- NEW FILE: MimeHeaders.java --- package org.beepcore.beep.core; import java.util.Enumeration; import java.util.Hashtable; import org.beepcore.beep.util.BufferSegment; public class MimeHeaders { /** * The default <code>DataStream</code> content type * ("application/octet-stream"). */ public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; /** * The default <code>DataStream</code> content transfer encoding * ("binary"). */ public static final String DEFAULT_CONTENT_TRANSFER_ENCODING = "binary"; /** * <code>DataStream</code> content type ("application/beep+xml"); */ public static final String BEEP_XML_CONTENT_TYPE = "application/beep+xml"; /** * */ public static final String CONTENT_TYPE = "Content-Type"; /** * */ public static final String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; private static final String NAME_VALUE_SEPARATOR = ": "; private static final String HEADER_SUFFIX = "\r\n"; private static final int HEADER_FORMAT_LENGTH = NAME_VALUE_SEPARATOR.length() + HEADER_SUFFIX.length(); // guessing most hashtables will only contain the 2 default values private static final int DEFAULT_HEADER_TABLE_SIZE = 2; private static final int MAX_BUFFER_SIZE = 128; // length of header portion of data stream private int lenHeaders = HEADER_SUFFIX.length(); private Hashtable mimeHeadersTable = new Hashtable(DEFAULT_HEADER_TABLE_SIZE); /** * Creates <code>MimeHeaders</code> using the default content type * <code>DEFAULT_CONTENT_TYPE</code> and default content transfre encoding * <code>DEFAULT_CONTENT_TRANSFER_ENCODING</code>. */ public MimeHeaders() { this(DEFAULT_CONTENT_TYPE, DEFAULT_CONTENT_TRANSFER_ENCODING); } /** * Creates <code>MimeHeaders</code> using the specified content type and * the <code>DEFAULT_CONTENT_TRANSFER_ENCODING</code> content transfer * encoding. */ public MimeHeaders(String contentType) { this(contentType, DEFAULT_CONTENT_TRANSFER_ENCODING); } /** * Creates <code>MimeHeaders</code> using the specified content type and * content transfer encoding. */ public MimeHeaders(String contentType, String transferEncoding) { this.setContentType(contentType); this.setTransferEncoding(transferEncoding); } /** * Returns the value of the MIME entity header <code>Content-Type</code>. */ public String getContentType() { return this.getHeaderValue(CONTENT_TYPE); } /** * Retrieves the correspoding <code>value</code> to a given a MIME entity * header <code>name</code>. * * @param name Name of the MIME entity header. * @return The <code>value</code> of the MIME entity header. * * @throws BEEPException */ public String getHeaderValue(String name) { return (String) this.mimeHeadersTable.get(name); } /** * Returns an <code>Enumeration</code> of all the names of the MIME entity * headers. * Use this call in conjunction with <code>getHeaderValue</code> to iterate * through all the corresponding MIME entity header <code>value</code>(s). * * @return An <code>Enumeration</code> of all the MIME entity header * names. */ public Enumeration getHeaderNames() { return this.mimeHeadersTable.keys(); } /** * Returns the value of the MIME entity header * <code>Content-Transfer-Encoding</code>. */ public String getTransferEncoding() { return this.getHeaderValue(CONTENT_TRANSFER_ENCODING); } /** * Removes the <code>name</code> and <code>value</code> of a MIME entity * header from the data stream. Returns <code>true</code> if the * <code>name</code> was successfully removed. * * @param name Name of the header to be removed from the data stream. * * @return Returns </code>true<code> if header was removed. Otherwise, * returns <code>false</code>. */ public boolean removeHeader(String name) { String value = (String) mimeHeadersTable.get(name); /** * @todo change to not allow the removal of content-type and * transfer-encoding. */ if (value != null) { if (this.mimeHeadersTable.remove(name) != null) { if ((name.equals(CONTENT_TYPE) && value.equals(DEFAULT_CONTENT_TYPE)) || (name.equals(CONTENT_TRANSFER_ENCODING) && value.equals(DEFAULT_CONTENT_TRANSFER_ENCODING))) { // don't decrement the length, these are default values } else { this.lenHeaders -= name.length() + value.length() + this.HEADER_FORMAT_LENGTH; } return true; } } return false; } /** * Sets the content type of a <code>DataStream</code>. * * @param contentType */ public void setContentType(String contentType) { this.setHeader(CONTENT_TYPE, contentType); } /** * Adds a MIME entity header to this data stream. * * @param name Name of the MIME enitity header. * @param value Value of the MIME entity header. */ public void setHeader(String name, String value) { if (this.mimeHeadersTable.containsKey(name)) { removeHeader(name); } this.mimeHeadersTable.put(name, value); if ((name.equals(CONTENT_TYPE) && value.equals(DEFAULT_CONTENT_TYPE)) || (name.equals(CONTENT_TRANSFER_ENCODING) && value.equals(DEFAULT_CONTENT_TRANSFER_ENCODING))) { // these are default values, don't add to len } else { this.lenHeaders += name.length() + value.length() + this.HEADER_FORMAT_LENGTH; } } /** * Sets the content transfer encoding of a <code>DataStream</code> * * @param transferEncoding */ public void setTransferEncoding(String transferEncoding) { this.setHeader(CONTENT_TRANSFER_ENCODING, transferEncoding); } public BufferSegment getBufferSegment() { byte[] headersBytes = new byte[this.lenHeaders]; int offsetHeaders = 0; // read the headers Enumeration headers = mimeHeadersTable.keys(); while (headers.hasMoreElements()) { String name = (String) headers.nextElement(); String value = (String) mimeHeadersTable.get(name); if ((name.equals(CONTENT_TYPE) && value.equals(DEFAULT_CONTENT_TYPE)) || (name.equals(CONTENT_TRANSFER_ENCODING) && value.equals(DEFAULT_CONTENT_TRANSFER_ENCODING))) { // these are default values that don't need to be added // to the payload continue; } // copy name System.arraycopy(name.getBytes(), 0, headersBytes, offsetHeaders, name.length()); offsetHeaders += name.length(); // ": " System.arraycopy(NAME_VALUE_SEPARATOR.getBytes(), 0, headersBytes, offsetHeaders, NAME_VALUE_SEPARATOR.length()); offsetHeaders += NAME_VALUE_SEPARATOR.length(); // copy value System.arraycopy(value.getBytes(), 0, headersBytes, offsetHeaders, value.length()); offsetHeaders += value.length(); // CRLF System.arraycopy(this.HEADER_SUFFIX.getBytes(), 0, headersBytes, offsetHeaders, this.HEADER_SUFFIX.length()); offsetHeaders += this.HEADER_SUFFIX.length(); } // read the CRLF that separates the headers and the data System.arraycopy(this.HEADER_SUFFIX.getBytes(), 0, headersBytes, offsetHeaders, this.HEADER_SUFFIX.length()); offsetHeaders += this.HEADER_SUFFIX.length(); return new BufferSegment(headersBytes); } } --- NEW FILE: OutputDataStream.java --- /* * OutputDataStream.java $Revision: 1.1 $ $Date: 2001/10/31 00:32:37 $ * * Copyright (c) 2001 Huston Franklin. All rights reserved. * * The contents of this file are subject to the Blocks Public License (the * "License"); You may not use this file except in compliance with the License. * * You may obtain a copy of the License at http://www.invisible.net/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * */ package org.beepcore.beep.core; import java.util.LinkedList; import org.beepcore.beep.util.BufferSegment; /** * <code>OutputDataStream</code> represents a BEEP message's payload as a * stream. * * @author Huston Franklin * @version $Revision: 1.1 $, $Date: 2001/10/31 00:32:37 $ */ public class OutputDataStream { /** * Creates an <code>OutputDataStream</code> without any mime * headers. It is the responsibility of the application to ensure * the mime headers exist in the first <code>BufferSegment</code> * added. */ public OutputDataStream() { this.mimeHeaders = null; } /** * Creates an <code>OutputDataStream</code> using the specified * mime headers. * * @param headers Mime headers to be prepended to the buffers in * the stream. */ protected OutputDataStream(MimeHeaders headers) { this.mimeHeaders = headers; } /** * Creates an <code>OutputDataStream</code> using the specified * mime headers. * * @param headers Mime headers to be prepended to the buffers in * the stream. */ protected OutputDataStream(MimeHeaders headers, BufferSegment buf) { this.mimeHeaders = headers; this.add(buf); } public void add(BufferSegment segment) { this.buffers.addLast(segment); } /** * @deprecated */ public void close() { this.setComplete(); } /** * Returns <code>true</code> if no more bytes will be added to * those currently available on this stream. Returns * <code>false</code> if more bytes are expected. */ public boolean isComplete() { return this.complete; } public void setComplete() { this.complete = true; } boolean availableSegment() { return (buffers.isEmpty() == false); } BufferSegment getNextSegment(int maxLength) { if (this.headersSent == false) { if (this.mimeHeaders != null) { this.buffers.addFirst(mimeHeaders.getBufferSegment()); } this.headersSent = true; } BufferSegment b = (BufferSegment)buffers.getFirst(); if (curOffset != 0 || maxLength < b.getLength()) { int origLength = b.getLength(); b = new BufferSegment(b.getData(), b.getOffset() + curOffset, Math.min(maxLength, origLength - curOffset)); if (curOffset + b.getLength() != origLength) { curOffset += b.getLength(); return b; } } buffers.removeFirst(); curOffset = 0; return b; } protected final MimeHeaders mimeHeaders; private LinkedList buffers = new LinkedList(); private boolean complete = false; private boolean headersSent = false; private int curOffset = 0; } Index: ByteDataStream.java =================================================================== RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/ByteDataStream.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -r1.3 -r1.4 *** ByteDataStream.java 2001/04/24 22:52:02 1.3 --- ByteDataStream.java 2001/10/31 00:32:37 1.4 *************** *** 1,7 **** - /* * ByteDataStream.java $Revision$ $Date$ * ! * Copyright (c) 2001 Invisible Worlds, Inc. All rights reserved. * * The contents of this file are subject to the Blocks Public License (the --- 1,6 ---- /* * ByteDataStream.java $Revision$ $Date$ * ! * Copyright (c) 2001 Huston Franklin. All rights reserved. * * The contents of this file are subject to the Blocks Public License (the *************** *** 18,26 **** package org.beepcore.beep.core; ! import java.io.ByteArrayInputStream; ! import java.io.InputStream; ! import java.io.IOException; ! import java.io.UnsupportedEncodingException; --- 17,24 ---- package org.beepcore.beep.core; + + import java.util.Enumeration; ! import org.beepcore.beep.util.BufferSegment; *************** *** 36,56 **** * @see org.beepcore.beep.core.DataStream * - * @author Eric Dixon * @author Huston Franklin - * @author Jay Kint - * @author Scott Pead * @version $Revision$, $Date$ */ ! public class ByteDataStream extends InputStreamDataStream { - ByteDataStream(String contentType, String transferEncoding) - { - super( contentType, transferEncoding, null ); - } - /** ! * Creates a <code>ByteDataStream</code> from a <code>byte[]</code> with a ! * content type of <code>DEFAULT_CONTENT_TYPE</code> and a transfer encoding ! * of <code>DEFAULT_CONTENT_TRANSFER_ENCODING</code>. * * @param data A <code>byte[]</code> representing a message's payload. --- 34,47 ---- * @see org.beepcore.beep.core.DataStream * * @author Huston Franklin * @version $Revision$, $Date$ */ ! public class ByteDataStream extends OutputDataStream { /** ! * Creates a <code>ByteDataStream</code> from a ! * <code>byte[]</code> with a content type of ! * <code>DEFAULT_CONTENT_TYPE</code> and a transfer encoding of ! * <code>DEFAULT_CONTENT_TRANSFER_ENCODING</code>. * * @param data A <code>byte[]</code> representing a message's payload. *************** *** 58,62 **** public ByteDataStream(byte[] data) { ! super(new ByteArrayInputStream(data)); } --- 49,53 ---- public ByteDataStream(byte[] data) { ! this(data, 0, data.length); } *************** *** 71,75 **** public ByteDataStream(String contentType, byte[] data) { ! super(contentType, new ByteArrayInputStream(data)); } --- 62,66 ---- public ByteDataStream(String contentType, byte[] data) { ! this(contentType, data, 0, data.length); } *************** *** 86,90 **** byte[] data) { ! super(contentType, transferEncoding, new ByteArrayInputStream(data)); } --- 77,81 ---- byte[] data) { ! this(contentType, transferEncoding, data, 0, data.length); } *************** *** 102,106 **** public ByteDataStream(byte[] data, int offset, int length) { ! super(new ByteArrayInputStream(data, offset, length)); } --- 93,98 ---- public ByteDataStream(byte[] data, int offset, int length) { ! super(new MimeHeaders(), ! new BufferSegment(data, offset, length)); } *************** *** 119,123 **** int length) { ! super(contentType, new ByteArrayInputStream(data, offset, length)); } --- 111,116 ---- int length) { ! super(new MimeHeaders(contentType), ! new BufferSegment(data, offset, length)); } *************** *** 138,166 **** byte[] data, int offset, int length) { ! super(contentType, transferEncoding, ! new ByteArrayInputStream(data, offset, length)); } ! void setData( byte[] data ) { ! this.data = new ByteArrayInputStream(data); } /** ! * Returns this data stream as an <code>InputStream</code> */ ! public InputStream getInputStream() { ! return this.data; } /** ! * Returns <code>true</code> if no more bytes will be added to those ! * currently available, if any, on this stream. Returns ! * <code>false</code> if more bytes are expected. */ ! public boolean isComplete() { ! return true; } } --- 131,237 ---- byte[] data, int offset, int length) { ! super(new MimeHeaders(contentType, transferEncoding), ! new BufferSegment(data, offset, length)); } ! /** ! * Returns <code>true</code> if no more bytes will be added to those ! * currently available, if any, on this stream. Returns ! * <code>false</code> if more bytes are expected. ! */ ! public boolean isComplete() { ! return true; } /** ! * @deprecated */ ! public void setContentType(String contentType) { ! this.mimeHeaders.setContentType(contentType); } /** ! * @deprecated */ ! public String getContentType() throws BEEPException { ! return this.mimeHeaders.getContentType(); ! } ! ! /** ! * @deprecated ! */ ! public void setTransferEncoding(String transferEncoding) ! { ! this.mimeHeaders.setTransferEncoding(transferEncoding); ! } ! ! /** ! * @deprecated ! */ ! public String getTransferEncoding() throws BEEPException ! { ! return this.mimeHeaders.getTransferEncoding(); ! } ! ! /** ! * Adds a MIME entity header to this data stream. ! * ! * @param name Name of the MIME enitity header. ! * @param value Value of the MIME entity header. ! * @deprecated ! */ ! public void setHeader(String name, String value) ! { ! this.mimeHeaders.setHeader(name, value); ! } ! ! /** ! * Retrieves the correspoding <code>value</code> to a given a MIME entity ! * header <code>name</code>. ! * ! * @param name Name of the MIME entity header. ! * @return The <code>value</code> of the MIME entity header. ! * ! * @throws BEEPException ! * @deprecated ! */ ! public String getHeaderValue(String name) throws BEEPException ! { ! return this.mimeHeaders.getHeaderValue(name); ! } ! ! /** ! * Returns an <code>Enumeration</code> of all the names of the MIME entity ! * headers in this data stream. ! * Use this call in conjunction with <code>getHeaderValue</code> to iterate ! * through all the corresponding MIME entity header <code>value</code>(s) ! * in this data stream. ! * ! * @return An <code>Enumeration</code> of all the MIME entity header ! * names. ! * ! * @throws BEEPException ! */ ! public Enumeration getHeaderNames() throws BEEPException ! { ! return this.mimeHeaders.getHeaderNames(); ! } ! ! /** ! * Removes the <code>name</code> and <code>value</code> of a MIME entity ! * header from the data stream. Returns <code>true</code> if the ! * <code>name</code> was successfully removed. ! * ! * @param name Name of the header to be removed from the data stream. ! * ! * @return Returns </code>true<code> if header was removed. Otherwise, ! * returns <code>false</code>. ! */ ! public boolean removeHeader(String name) ! { ! return this.mimeHeaders.removeHeader(name); } } Index: Channel.java =================================================================== RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Channel.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -r1.15 -r1.16 *** Channel.java 2001/07/30 12:59:03 1.15 --- Channel.java 2001/10/31 00:32:37 1.16 *************** *** 22,25 **** --- 22,26 ---- import java.util.*; + import org.beepcore.beep.util.BufferSegment; import org.beepcore.beep.util.Log; *************** *** 60,63 **** --- 61,66 ---- "Reply received for a message never sent."; private static final int MAX_PAYLOAD_SIZE = 4096; + private static final BufferSegment zeroLengthSegment = + new BufferSegment(new byte[0]); /** @todo check this */ *************** *** 81,85 **** /** receiver of messages (or partial messages) */ ! private DataListener listener; /** number of last message sent */ --- 84,88 ---- /** receiver of messages (or partial messages) */ ! private MessageListener listener; /** number of last message sent */ *************** *** 98,101 **** --- 101,107 ---- private LinkedList recvMSGQueue; + /** messages queued to be sent */ + private LinkedList pendingSendMessages; + /** size of the frames */ private int frameSize; *************** *** 128,134 **** private int waitTimeForPeer; - /** semaphore for waiting until all ANS have been */ - private int msgsPending; - private boolean notifyOnFirstFrame; --- 134,137 ---- *************** *** 154,158 **** * @see org.beepcore.beep.core.DataListener */ ! protected Channel(String profile, String number, DataListener listener, Session session) { --- 157,161 ---- * @see org.beepcore.beep.core.DataListener */ ! protected Channel(String profile, String number, MessageListener listener, Session session) { *************** *** 167,170 **** --- 170,174 ---- lastMessageSent = 1; + pendingSendMessages = new LinkedList(); sentMSGQueue = Collections.synchronizedList(new LinkedList()); recvMSGQueue = new LinkedList(); *************** *** 175,179 **** prevAckno = 0; prevWindowUsed = 0; - msgsPending = 0; peerWindowSize = DEFAULT_WINDOW_SIZE; waitTimeForPeer = 0; --- 179,182 ---- *************** *** 194,199 **** // Add a MSG to the SentMSGQueue to fake channel into accepting the // greeting which comes in an unsolicited RPY. ! Message m = new Message(this, 0, null, Message.MESSAGE_TYPE_MSG); ! sentMSGQueue.add(new MessageStatus(m, rl)); state = STATE_OK; --- 197,202 ---- // Add a MSG to the SentMSGQueue to fake channel into accepting the // greeting which comes in an unsolicited RPY. ! sentMSGQueue.add(new MessageStatus(this, Message.MESSAGE_TYPE_MSG, 0, ! null, rl)); state = STATE_OK; *************** *** 347,368 **** /** ! * Sets the <code>DataListener</code> for this channel. * * @param ml */ ! public void setDataListener(DataListener ml) { ! ! // @todo can this be called consecutively with different listeners? ! // if so, should it return the old listener? ! // if not, should we prevent it? this.listener = ml; } /** * Returns the message listener for this channel. - * */ ! public DataListener getDataListener() { return this.listener; --- 350,369 ---- /** ! * Sets the <code>MessageListener</code> for this channel. * * @param ml + * @returns The previous MessageListener or null if none was set. */ ! public MessageListener setMessageListener(MessageListener ml) { ! MessageListener tmp = this.listener; this.listener = ml; + return tmp; } /** * Returns the message listener for this channel. */ ! public MessageListener getMessageListener() { return this.listener; *************** *** 379,401 **** /** - * Sends a message of type ANS. - * - * @param stream Data to send in the form of <code>DataStream</code>. - * - * @see DataStream - * @see MessageStatus - * @see #sendNUL - * - * @return MessageStatus - * - * @throws BEEPException if an error is encoutered. - * @deprecated - */ - public MessageStatus sendANS(DataStream stream) throws BEEPException - { - return this.currentMSG.sendANS(stream); - } - - /** * Sends a message of type MSG. * --- 380,383 ---- *************** *** 411,415 **** * @throws BEEPException if an error is encoutered. */ ! public MessageStatus sendMSG(DataStream stream, ReplyListener replyListener) throws BEEPException --- 393,397 ---- * @throws BEEPException if an error is encoutered. */ ! public MessageStatus sendMSG(OutputDataStream stream, ReplyListener replyListener) throws BEEPException *************** *** 431,438 **** // create a new request ! status = ! new MessageStatus(new Message(this, lastMessageSent, stream, ! Message.MESSAGE_TYPE_MSG), ! replyListener); // message 0 was the greeting, it was already sent, inc the counter --- 413,418 ---- // create a new request ! status = new MessageStatus(this, Message.MESSAGE_TYPE_MSG, ! lastMessageSent, stream, replyListener); // message 0 was the greeting, it was already sent, inc the counter *************** *** 455,622 **** /** - * Sends a message of type MSG. - * - * @param stream Data to send in the form of <code>DataStream</code>. - * @param frameListener A "one-shot" listener that will handle replies - * to this sendMSG listener. - * - * @see DataStream - * @see MessageStatus - * - * @return MessageStatus - * - * @throws BEEPException if an error is encoutered. - */ - MessageStatus sendMSG(DataStream stream, - FrameListener frameListener) - throws BEEPException - { - MessageStatus status; - - if (state != STATE_OK) { - switch (state) { - case STATE_ERROR : - throw new BEEPException(ERR_CHANNEL_ERROR_STATE); - case STATE_UNINITIALISED : - throw new BEEPException(ERR_CHANNEL_UNINITIALISED_STATE); - default : - throw new BEEPException(ERR_CHANNEL_UNKNOWN_STATE); - } - } - - synchronized (this) { - - // create a new request - status = - new MessageStatus(new Message(this, lastMessageSent, stream, - Message.MESSAGE_TYPE_MSG), - frameListener); - - // message 0 was the greeting, it was already sent, inc the counter - ++lastMessageSent; - } - - // put this in the list of messages waiting - // may want to put an expiration or something in here so they - // don't just stay around taking up space. - // @todo it's a synchronized list, you don't have to sync - synchronized (sentMSGQueue) { - sentMSGQueue.add(status); - } - - // send it on the session - sendToPeer(status); - - return status; - } - - /** - * Sends a message of type NUL. - * - * @see MessageStatus - * @see #sendANS - * - * @return MessageStatus - * - * @throws BEEPException if an error is encoutered. - * @deprecated - */ - public MessageStatus sendNUL() throws BEEPException - { - Message m = this.currentMSG; - this.currentMSG = null; - - return m.sendNUL(); - } - - /** - * Sends a message of type RPY. - * - * @param stream Data to send in the form of <code>DataStream</code>. - * - * @see DataStream - * @see MessageStatus - * - * @return MessageStatus - * - * @throws BEEPException if an error is encoutered. - * @deprecated - */ - public MessageStatus sendRPY(DataStream stream) throws BEEPException - { - Message m = this.currentMSG; - this.currentMSG = null; - - return m.sendRPY(stream); - } - - /** - * Sends a message of type ERR. - * - * @param error Error to send in the form of <code>BEEPError</code>. - * - * @see BEEPError - * @see MessageStatus - * - * @return MessageStatus - * - * @throws BEEPException if an error is encoutered. - * @deprecated - */ - public MessageStatus sendERR(BEEPError error) throws BEEPException - { - Message m = this.currentMSG; - this.currentMSG = null; - - return m.sendERR(error); - } - - /** - * Sends a message of type ERR. - * - * @param code <code>code</code> attibute in <code>error</code> element. - * @param diagnostic Message for <code>error</code> element. - * - * @see MessageStatus - * - * @return MessageStatus - * - * @throws BEEPException if an error is encoutered. - * @deprecated - */ - public MessageStatus sendERR(int code, String diagnostic) - throws BEEPException - { - Message m = this.currentMSG; - this.currentMSG = null; - - return m.sendERR(code, diagnostic); - } - - /** - * Sends a message of type ERR. - * - * @param code <code>code</code> attibute in <code>error</code> element. - * @param diagnostic Message for <code>error</code> element. - * @param xmlLang <code>xml:lang</code> attibute in <code>error</code> - * element. - * - * @see MessageStatus - * - * @return MessageStatus - * - * @throws BEEPException if an error is encoutered. - * @deprecated - */ - public MessageStatus sendERR(int code, String diagnostic, String xmlLang) - throws BEEPException - { - Message m = this.currentMSG; - this.currentMSG = null; - - return m.sendERR(code, diagnostic, xmlLang); - } - - /** * get the number of this <code>Channel</code> as a <code>String</code> * --- 435,438 ---- *************** *** 659,663 **** if (m == null) { m = new MessageMSG(this, frame.getMsgno(), ! new FrameDataStream(true)); recvMSGQueue.addLast(m); --- 475,479 ---- if (m == null) { m = new MessageMSG(this, frame.getMsgno(), ! new InputDataStream(this)); recvMSGQueue.addLast(m); *************** *** 665,670 **** } ! ((FrameDataStream) m.getDataStream()).add(frame); // The MessageListener interface only allows one message // up to be processed at a time so if this is not the --- 481,493 ---- } ! Iterator i = frame.getPayload(); ! while (i.hasNext()) { ! m.getDataStream().add((BufferSegment)i.next()); ! } + if (frame.isLast()) { + m.getDataStream().setComplete(); + } + // The MessageListener interface only allows one message // up to be processed at a time so if this is not the *************** *** 718,724 **** mstatus = (MessageStatus) sentMSGQueue.get(0); - sentMSG = mstatus.getMessage(); ! if (sentMSG.getMsgno() != frame.getMsgno()) { // @todo shutdown session (we think) --- 541,546 ---- mstatus = (MessageStatus) sentMSGQueue.get(0); ! if (mstatus.getMsgno() != frame.getMsgno()) { // @todo shutdown session (we think) *************** *** 789,793 **** if (m == null) { m = new Message(this, frame.getMsgno(), frame.getAnsno(), ! new FrameDataStream(true)); if (!frame.isLast()) { --- 611,615 ---- if (m == null) { m = new Message(this, frame.getMsgno(), frame.getAnsno(), ! new InputDataStream(this)); if (!frame.isLast()) { *************** *** 804,808 **** if (recvReplyQueue.size() == 0) { m = new Message(this, frame.getMsgno(), ! new FrameDataStream(true), frame.getMessageType()); --- 626,630 ---- if (recvReplyQueue.size() == 0) { m = new Message(this, frame.getMsgno(), ! new InputDataStream(this), frame.getMessageType()); *************** *** 831,836 **** } ! ((FrameDataStream) m.getDataStream()).add(frame); // notify message listener if this message has not been // notified before and notifyOnFirstFrame is set, the --- 653,665 ---- } ! Iterator i = frame.getPayload(); ! while (i.hasNext()) { ! m.getDataStream().add((BufferSegment)i.next()); ! } + if (frame.isLast()) { + m.getDataStream().setComplete(); + } + // notify message listener if this message has not been // notified before and notifyOnFirstFrame is set, the *************** *** 881,886 **** Message currentMessage = null; int msgno = frame.getMsgno(); - ReplyListener replyListener = null; - FrameListener frameListener = null; if (state != STATE_OK) { --- 710,713 ---- *************** *** 921,925 **** } } else { ! int expectedMsgno; synchronized (sentMSGQueue) { --- 748,752 ---- } } else { ! MessageStatus mstatus; synchronized (sentMSGQueue) { *************** *** 928,941 **** } ! MessageStatus mstatus = ! (MessageStatus) sentMSGQueue.get(0); ! expectedMsgno = mstatus.getMessage().getMsgno(); } ! if (frame.getMsgno() != expectedMsgno) { throw new BEEPException(ERR_CHANNEL_MESSAGE_NUMBER_PREFIX + frame.getMsgno() + ERR_CHANNEL_MIDDLE ! + expectedMsgno); } } --- 755,766 ---- } ! mstatus = (MessageStatus) sentMSGQueue.get(0); } ! if (frame.getMsgno() != mstatus.getMsgno()) { throw new BEEPException(ERR_CHANNEL_MESSAGE_NUMBER_PREFIX + frame.getMsgno() + ERR_CHANNEL_MIDDLE ! + mstatus.getMsgno()); } } *************** *** 970,977 **** } - if (listener instanceof FrameListener) { - frameListener = (FrameListener) this.listener; - } - if (frame.getMessageType() != Message.MESSAGE_TYPE_MSG) { MessageStatus mstatus; --- 795,798 ---- *************** *** 984,1034 **** mstatus = (MessageStatus) sentMSGQueue.get(0); ! if (mstatus.getMessage().getMsgno() != frame.getMsgno()) { throw new BEEPException("Received reply out of order"); } - - FrameListener l = mstatus.getFrameListener(); - - if (l != null) { - frameListener = l; - - // if this is the last frame and on a reply type (NUL, - // RPY, or ERR) - if ((frame.isLast() == true) - && (frame.getMessageType() - != Message.MESSAGE_TYPE_ANS)) { - sentMSGQueue.remove(0); - } - } } } - - if (frameListener != null) { - - // call the frame listener and then subtract that from the used - try { - frameListener.receiveFrame(frame); - - recvWindowUsed -= frame.getSize(); ! if (session.updateMyReceiveBufferSize(this, prevAckno, ! recvSequence, ! prevWindowUsed, ! recvWindowUsed, ! recvWindowSize)) { ! prevAckno = recvSequence; ! prevWindowUsed = recvWindowUsed; ! } ! } catch (BEEPException e) { ! // @todo change this to do the right thing ! throw new BEEPException(e.getMessage()); ! } ! } else { ! try { ! receiveFrame(frame); ! } catch (BEEPException e) { ! // @todo change this to do the right thing ! throw new BEEPException(e.getMessage()); ! } } --- 805,819 ---- mstatus = (MessageStatus) sentMSGQueue.get(0); ! if (mstatus.getMsgno() != frame.getMsgno()) { throw new BEEPException("Received reply out of order"); } } } ! try { ! receiveFrame(frame); ! } catch (BEEPException e) { ! // @todo change this to do the right thing ! throw new BEEPException(e.getMessage()); } *************** *** 1046,1050 **** } ! MessageStatus sendMessage(Message m) throws BEEPException { if (state != STATE_OK) { --- 831,835 ---- } ! void sendMessage(MessageStatus m) throws BEEPException { if (state != STATE_OK) { *************** *** 1059,1077 **** } - // create a new request - MessageStatus status = new MessageStatus(m); - // send it on the session ! sendToPeer(status); ! return status; } synchronized void sendToPeer(MessageStatus status) throws BEEPException { - Message message = status.getMessage(); Frame frame = null; int available = 0; ! DataStream stream; byte[] payload; int sessionBufferSize; --- 844,973 ---- } // send it on the session ! sendToPeer(m); ! } ! void sendToPeer(MessageStatus status) throws BEEPException ! { ! synchronized (pendingSendMessages) { ! pendingSendMessages.add(status); ! } ! sendQueuedMessages(); ! } ! ! private synchronized void sendQueuedMessages() throws BEEPException ! { ! while (true) { ! MessageStatus status; ! ! synchronized (pendingSendMessages) { ! if (pendingSendMessages.isEmpty()) { ! return; ! } ! status = (MessageStatus) pendingSendMessages.removeFirst(); ! } ! ! sendFrames(status); ! ! if (status.getMessageStatus() != ! MessageStatus.MESSAGE_STATUS_SENT) ! { ! synchronized (pendingSendMessages) { ! pendingSendMessages.addFirst(status); ! } ! return; ! } ! } ! } ! ! private void sendFrames(MessageStatus status) ! throws BEEPException ! { ! int sessionBufferSize = session.getMaxFrameSize(); ! OutputDataStream ds = status.getMessageData(); ! ! do { ! synchronized (this) { ! Frame frame; ! // create a frame ! frame = new Frame(status.getMessageType(), ! status.getChannel(), status.getMsgno(), ! false, ! sentSequence, 0, status.getAnsno()); ! ! // make sure the other peer can accept something ! if (peerWindowSize == 0) { ! try { ! wait(); ! } catch (InterruptedException e) { ! throw new RuntimeException("caught InterruptedException"); ! } ! ! // wait until there is something to send up to ! // our timeout ! if (peerWindowSize == 0) { ! throw new BEEPException("Time expired waiting " + ! "for peer."); ! } ! } ! ! int maxToSend = ! Math.min(sessionBufferSize, peerWindowSize); ! ! int size = 0; ! while (size < maxToSend) { ! ... [truncated message content] |