Thread: [Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/core InputDataStream.java,NONE,1.1
Status: Beta
Brought to you by:
huston
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core In directory usw-pr-cvs1:/tmp/cvs-serv24137/src/org/beepcore/beep/core Modified Files: ByteDataStream.java Channel.java Frame.java Message.java MessageListener.java MessageMSG.java MessageStatus.java ProfileRegistry.java Session.java StringDataStream.java TuningProfile.java Added Files: InputDataStream.java InputDataStreamAdapter.java MimeHeaders.java OutputDataStream.java Log Message: New *DataStream --- NEW FILE: InputDataStream.java --- /* * InputDataStream.java $Revision: 1.1 $ $Date: 2001/10/31 00:32:37 $ * * Copyright (c) 2001 Huston Franklin. All rights reserved. * * The contents of this file are subject to the Blocks Public License (the * "License"); You may not use this file except in compliance with the License. * * You may obtain a copy of the License at http://www.invisible.net/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * */ package org.beepcore.beep.core; import java.util.Enumeration; import java.util.Iterator; import java.util.LinkedList; import org.beepcore.beep.util.BufferSegment; /** * <code>InputDataStream</code> holds a stream of * <code>BufferSegments</code>(s) and provides accessor methods to * that stream. * <p> * <b>Note that this implementation is not synchronized.</b> If * multiple threads access a <code>InputDataStream</code> * concurrently, data may be inconsistent or lost. * * @see org.beepcore.beep.util.BufferSegment * * @author Huston Franklin * @version $Revision: 1.1 $, $Date: 2001/10/31 00:32:37 $ */ public class InputDataStream { /** * Creates a <code>InputDataStream</code>. */ InputDataStream() { } /** * Creates a <code>InputDataStream</code>. * For use with in <code>Channel</code>. * * @param channel Is notified as BufferSegments are read this allows the * Channel to update the receive window. */ InputDataStream(Channel channel) { this.channel = channel; } /** * Creates a <code>InputDataStream</code>. * * @param buf The first buffer in the data stream. */ InputDataStream(BufferSegment buf) { this.add(buf); } /** * Creates a <code>InputDataStream</code>. * * @param buf The first buffer in the data stream. */ InputDataStream(BufferSegment buf, boolean complete) { this(buf); if (complete) { this.setComplete(); } } void add(BufferSegment segment) { if( this.closed) { if (this.channel != null) { this.channel.freeReceiveBufferBytes(segment.getLength()); } return; } synchronized (this.buffers) { this.buffers.addLast(segment); this.buffers.notify(); } this.availableBytes += segment.getLength(); } public int available() { return this.availableBytes; // int bytesAvailable = 0; // synchronized (this.buffers) { // Iterator i = this.buffers.iterator(); // while (i.hasNext()) { // bytesAvailable += ((BufferSegment) i.next()).getLength(); // } // } // return bytesAvailable; } /** * @todo doc */ public boolean availableSegment() { return (this.buffers.isEmpty() == false); } /** * @todo doc */ public void close() { this.closed = true; while (this.availableSegment()) { this.getNextSegment(); } } /** * @todo doc */ public InputDataStreamAdapter getInputStream() { if (stream == null) { stream = new InputDataStreamAdapter(this); } return stream; } /** * @todo doc */ public BufferSegment getNextSegment() { BufferSegment b; synchronized (buffers) { b = (BufferSegment) buffers.removeFirst(); } if (this.channel != null) { this.channel.freeReceiveBufferBytes(b.getLength()); } this.availableBytes -= b.getLength(); return b; } public BufferSegment waitForNextSegment() throws InterruptedException { synchronized (buffers) { if (availableSegment() == false) { buffers.wait(); } return getNextSegment(); } } public boolean isClosed() { return closed; } /** * Returns <code>true</code> if no more bytes will be added to * those currently available on this stream. Returns * <code>false</code> if more bytes are expected. */ public boolean isComplete() { return this.complete; } void setComplete() { this.complete = true; // @todo notify objects waiting for more buffers. } LinkedList buffers = new LinkedList(); private int availableBytes = 0; private Channel channel = null; private boolean closed = false; private boolean complete = false; private InputDataStreamAdapter stream = null; } /** * Returns the content type of a <code>InputDataStrea</code>. If * the <code>BufferSegment</code> containing the content type * hasn't been received yet, the method blocks until it is * received. * * @return Content type. * * @throws BEEPException * @deprecated */ // public String getContentType() throws BEEPException // { // return this.getInputStream().getContentType(); // } /** * Returns the value of the MIME entity header which corresponds * to the given <code>name</code>. If the <code>BufferSegment</code> * containing the content type hasn't been received yet, the * method blocks until it is received. * * @param name Name of the entity header. * @return String Value of the entity header. * * @throws BEEPException * @deprecated */ // public String getHeaderValue(String name) throws BEEPException // { // return this.getInputStream().getHeaderValue(name); // } /** * Returns an <code>Enumeration</code> of all the MIME entity * header names belonging to this <code>InputDataStream</code>. * If the <code>BufferSegment</code> containing the content type * hasn't been received yet, the method blocks until it is * received. * * @throws BEEPException * @deprecated */ // public Enumeration getHeaderNames() throws BEEPException // { // return this.getInputStream().getHeaderNames(); // } /** * Returns the transfer encoding of a <code>InputDataStrea</code>. * If the <code>BufferSegment</code> containing the content type * hasn't been received yet, the method blocks until it is * received. * * @throws BEEPException * @deprecated */ // public String getTransferEncoding() throws BEEPException // { // return this.getInputStream().getTransferEncoding(); // } --- NEW FILE: InputDataStreamAdapter.java --- package org.beepcore.beep.core; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Enumeration; import java.util.Hashtable; import org.beepcore.beep.util.BufferSegment; import org.beepcore.beep.util.Log; public class InputDataStreamAdapter extends java.io.InputStream { InputDataStreamAdapter(InputDataStream ids) { this.ids = ids; } public int available() { if (this.state != STATE_HEADERS_PARSED) { parseHeaders(); if (this.state != STATE_HEADERS_PARSED) { return 0; } } if (this.pos == this.curBuf.getLength()) { setNextBuffer(); } return (this.curBuf.getLength() - this.pos) + this.ids.available(); } public void close() { ids.close(); } /** * Returns the content type of a <code>FrameDataStrea</code>. If the * <code>Frame</code> containing the content type hasn't been received yet, * the method blocks until it is received. * * @return Content type. * * @throws BEEPException */ public String getContentType() throws BEEPException { return this.getHeaderValue(MimeHeaders.CONTENT_TYPE); } /** * Returns the value of the MIME entity header which corresponds * to the given <code>name</code>. If the <code>Frame</code> * containing the content type hasn't been received yet, the * method blocks until it is received. * * @param name Name of the entity header. * @return String Value of the entity header. * * @throws BEEPException */ public String getHeaderValue(String name) throws BEEPException { waitAvailable(); return (String) this.mimeHeaders.get(name); } /** * Returns an <code>Enumeration</code> of all the MIME entity header names * belonging to this <code>FrameDataStream</code>. If the * <code>Frame</code> containing the content type hasn't been received yet, * the method blocks until it is received. * * @throws BEEPException */ public Enumeration getHeaderNames() throws BEEPException { waitAvailable(); return this.mimeHeaders.keys(); } /** * Returns the transfer encoding of a <code>FrameDataStrea</code>. If the * <code>Frame</code> containing the content type hasn't been received yet, * the method blocks until it is received. * * @return Content type. * * @throws BEEPException */ public String getTransferEncoding() throws BEEPException { return this.getHeaderValue(MimeHeaders.CONTENT_TRANSFER_ENCODING); } public int read() throws IOException { if (waitAvailable() == -1) { return -1; } return internalRead(); } public int read(byte[] b) throws IOException { return read(b, 0, b.length); } public int read(byte[] b, int off, int len) throws IOException { if (off == -1 || len == -1 || off + len > b.length) throw new IndexOutOfBoundsException(); int bytesRead = 0; while (bytesRead < len) { if (waitAvailable() == -1) { if (bytesRead == 0) { return -1; } else { break; } } int n = internalRead(b, off + bytesRead, len - bytesRead); if (n == 0 && ids.isClosed()) { return bytesRead != 0 ? bytesRead : -1; } bytesRead += n; } return bytesRead; } public long skip(long n) throws IOException { if (n > Integer.MAX_VALUE) { return 0; } int bytesSkipped = 0; while (bytesSkipped < n && ids.isClosed() == false) { if (waitAvailable() == -1) { break; } int i = Math.min(((int) n) - bytesSkipped, curBuf.getLength() - pos); pos += i; bytesSkipped += i; } return bytesSkipped; } /** * This version of read() does not block if there are no bytes * available. */ private int internalRead() { if (setNextBuffer() == false) { return -1; } int b = curBuf.getData()[curBuf.getOffset() + pos] & 0xff; pos++; return b; } /** * This version of read(byte[], int, int) does not block if * there are no bytes available. */ private int internalRead(byte[] b, int off, int len) { int bytesRead = 0; while (bytesRead < len) { if (setNextBuffer() == false) { break; } int n = Math.min(len - bytesRead, curBuf.getLength() - pos); System.arraycopy(curBuf.getData(), curBuf.getOffset() + pos, b, off + bytesRead, n); pos += n; bytesRead += n; } return bytesRead; } private void parseHeaders() { while (true) { switch (state) { case STATE_INIT: state = STATE_PARSING_NAME; break; case STATE_PARSING_NAME: parseName(); if (state == STATE_PARSING_NAME) { return; } break; case STATE_PARSING_NAME_TERMINATOR: parseNameTerminator(); if (state == STATE_PARSING_NAME_TERMINATOR) { return; } break; case STATE_PARSING_VALUE: parseValue(); if (state == STATE_PARSING_VALUE) { return; } break; case STATE_PARSING_VALUE_TERMINATOR: parseValueTerminator(); if (state == STATE_PARSING_VALUE_TERMINATOR) { return; } break; case STATE_PARSING_HEADERS_TERMINATOR: parseHeadersTerminator(); if (state == STATE_PARSING_HEADERS_TERMINATOR) { return; } break; case STATE_HEADERS_PARSED: return; } } } private void parseHeadersTerminator() { int b = internalRead(); // move off of the LF if (b == -1) { return; } if (this.mimeHeaders.get(MimeHeaders.CONTENT_TYPE) == null) { this.mimeHeaders.put(MimeHeaders.CONTENT_TYPE, MimeHeaders.DEFAULT_CONTENT_TYPE); } if (mimeHeaders.get(MimeHeaders.CONTENT_TRANSFER_ENCODING) == null) { mimeHeaders.put(MimeHeaders.CONTENT_TRANSFER_ENCODING, MimeHeaders.DEFAULT_CONTENT_TRANSFER_ENCODING); } state = STATE_HEADERS_PARSED; } private void parseName() { int b = internalRead(); if (b == -1) { return; } if (b == CR) { state = STATE_PARSING_HEADERS_TERMINATOR; return; } name.write((byte) b); // find the ':' while (true) { b = internalRead(); if (b == -1) { return; } if (b == COLON) { state = STATE_PARSING_NAME_TERMINATOR; return; } name.write((byte) b); } } private void parseNameTerminator() { int b = internalRead(); // we are pointing to ':' move off of ' ' if (b == -1) { return; } state = STATE_PARSING_VALUE; } private void parseValue() { while (true) { int b = internalRead(); if (b == -1) { return; } if (b == CR) { state = STATE_PARSING_VALUE_TERMINATOR; return; } value.write((byte) b); } } private void parseValueTerminator() { int b = internalRead(); // move off of LF if (b == -1) { return; } try { this.mimeHeaders.put(name.toString("UTF-8"), value.toString("UTF-8")); } catch (UnsupportedEncodingException e) { throw new RuntimeException("UTF-8 not supported"); } name.reset(); value.reset(); state = STATE_PARSING_NAME; } /** * If there are no bytes remaining in the current buffer move to * the next one if it exists. */ private boolean setNextBuffer() { while (pos == curBuf.getLength()) { if (ids.availableSegment() == false) { return false; } curBuf = ids.getNextSegment(); pos = 0; } return true; } /** * Wait until available() != 0 */ private int waitAvailable() { int n; if ((n = available()) > 0) { return n; } synchronized (ids.buffers) { while ((n = available()) == 0) { if (ids.isComplete() == true) { // no more bytes to read() and none are // expected, return -1 return -1; } // no bytes available to read, but more are // expected... block try { ids.buffers.wait(); } catch (InterruptedException e) { Log.logEntry(Log.SEV_ERROR, e); } } return n; } } private static final BufferSegment zeroLength = new BufferSegment(new byte[0]); private int pos = 0; private BufferSegment curBuf = zeroLength; private static final int LF = 10; private static final int CR = 13; private static final int COLON = 58; private static final int STATE_INIT = 0; private static final int STATE_PARSING_NAME = 1; private static final int STATE_PARSING_NAME_TERMINATOR = 2; private static final int STATE_PARSING_VALUE = 3; private static final int STATE_PARSING_VALUE_TERMINATOR = 4; private static final int STATE_PARSING_HEADERS_TERMINATOR = 5; private static final int STATE_HEADERS_PARSED = 6; private int state = STATE_INIT; private ByteArrayOutputStream name = new ByteArrayOutputStream(32); private ByteArrayOutputStream value = new ByteArrayOutputStream(32); private static final int DEFAULT_HEADER_TABLE_SIZE = 2; private Hashtable mimeHeaders = new Hashtable(DEFAULT_HEADER_TABLE_SIZE); private InputDataStream ids; } --- NEW FILE: MimeHeaders.java --- package org.beepcore.beep.core; import java.util.Enumeration; import java.util.Hashtable; import org.beepcore.beep.util.BufferSegment; public class MimeHeaders { /** * The default <code>DataStream</code> content type * ("application/octet-stream"). */ public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; /** * The default <code>DataStream</code> content transfer encoding * ("binary"). */ public static final String DEFAULT_CONTENT_TRANSFER_ENCODING = "binary"; /** * <code>DataStream</code> content type ("application/beep+xml"); */ public static final String BEEP_XML_CONTENT_TYPE = "application/beep+xml"; /** * */ public static final String CONTENT_TYPE = "Content-Type"; /** * */ public static final String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; private static final String NAME_VALUE_SEPARATOR = ": "; private static final String HEADER_SUFFIX = "\r\n"; private static final int HEADER_FORMAT_LENGTH = NAME_VALUE_SEPARATOR.length() + HEADER_SUFFIX.length(); // guessing most hashtables will only contain the 2 default values private static final int DEFAULT_HEADER_TABLE_SIZE = 2; private static final int MAX_BUFFER_SIZE = 128; // length of header portion of data stream private int lenHeaders = HEADER_SUFFIX.length(); private Hashtable mimeHeadersTable = new Hashtable(DEFAULT_HEADER_TABLE_SIZE); /** * Creates <code>MimeHeaders</code> using the default content type * <code>DEFAULT_CONTENT_TYPE</code> and default content transfre encoding * <code>DEFAULT_CONTENT_TRANSFER_ENCODING</code>. */ public MimeHeaders() { this(DEFAULT_CONTENT_TYPE, DEFAULT_CONTENT_TRANSFER_ENCODING); } /** * Creates <code>MimeHeaders</code> using the specified content type and * the <code>DEFAULT_CONTENT_TRANSFER_ENCODING</code> content transfer * encoding. */ public MimeHeaders(String contentType) { this(contentType, DEFAULT_CONTENT_TRANSFER_ENCODING); } /** * Creates <code>MimeHeaders</code> using the specified content type and * content transfer encoding. */ public MimeHeaders(String contentType, String transferEncoding) { this.setContentType(contentType); this.setTransferEncoding(transferEncoding); } /** * Returns the value of the MIME entity header <code>Content-Type</code>. */ public String getContentType() { return this.getHeaderValue(CONTENT_TYPE); } /** * Retrieves the correspoding <code>value</code> to a given a MIME entity * header <code>name</code>. * * @param name Name of the MIME entity header. * @return The <code>value</code> of the MIME entity header. * * @throws BEEPException */ public String getHeaderValue(String name) { return (String) this.mimeHeadersTable.get(name); } /** * Returns an <code>Enumeration</code> of all the names of the MIME entity * headers. * Use this call in conjunction with <code>getHeaderValue</code> to iterate * through all the corresponding MIME entity header <code>value</code>(s). * * @return An <code>Enumeration</code> of all the MIME entity header * names. */ public Enumeration getHeaderNames() { return this.mimeHeadersTable.keys(); } /** * Returns the value of the MIME entity header * <code>Content-Transfer-Encoding</code>. */ public String getTransferEncoding() { return this.getHeaderValue(CONTENT_TRANSFER_ENCODING); } /** * Removes the <code>name</code> and <code>value</code> of a MIME entity * header from the data stream. Returns <code>true</code> if the * <code>name</code> was successfully removed. * * @param name Name of the header to be removed from the data stream. * * @return Returns </code>true<code> if header was removed. Otherwise, * returns <code>false</code>. */ public boolean removeHeader(String name) { String value = (String) mimeHeadersTable.get(name); /** * @todo change to not allow the removal of content-type and * transfer-encoding. */ if (value != null) { if (this.mimeHeadersTable.remove(name) != null) { if ((name.equals(CONTENT_TYPE) && value.equals(DEFAULT_CONTENT_TYPE)) || (name.equals(CONTENT_TRANSFER_ENCODING) && value.equals(DEFAULT_CONTENT_TRANSFER_ENCODING))) { // don't decrement the length, these are default values } else { this.lenHeaders -= name.length() + value.length() + this.HEADER_FORMAT_LENGTH; } return true; } } return false; } /** * Sets the content type of a <code>DataStream</code>. * * @param contentType */ public void setContentType(String contentType) { this.setHeader(CONTENT_TYPE, contentType); } /** * Adds a MIME entity header to this data stream. * * @param name Name of the MIME enitity header. * @param value Value of the MIME entity header. */ public void setHeader(String name, String value) { if (this.mimeHeadersTable.containsKey(name)) { removeHeader(name); } this.mimeHeadersTable.put(name, value); if ((name.equals(CONTENT_TYPE) && value.equals(DEFAULT_CONTENT_TYPE)) || (name.equals(CONTENT_TRANSFER_ENCODING) && value.equals(DEFAULT_CONTENT_TRANSFER_ENCODING))) { // these are default values, don't add to len } else { this.lenHeaders += name.length() + value.length() + this.HEADER_FORMAT_LENGTH; } } /** * Sets the content transfer encoding of a <code>DataStream</code> * * @param transferEncoding */ public void setTransferEncoding(String transferEncoding) { this.setHeader(CONTENT_TRANSFER_ENCODING, transferEncoding); } public BufferSegment getBufferSegment() { byte[] headersBytes = new byte[this.lenHeaders]; int offsetHeaders = 0; // read the headers Enumeration headers = mimeHeadersTable.keys(); while (headers.hasMoreElements()) { String name = (String) headers.nextElement(); String value = (String) mimeHeadersTable.get(name); if ((name.equals(CONTENT_TYPE) && value.equals(DEFAULT_CONTENT_TYPE)) || (name.equals(CONTENT_TRANSFER_ENCODING) && value.equals(DEFAULT_CONTENT_TRANSFER_ENCODING))) { // these are default values that don't need to be added // to the payload continue; } // copy name System.arraycopy(name.getBytes(), 0, headersBytes, offsetHeaders, name.length()); offsetHeaders += name.length(); // ": " System.arraycopy(NAME_VALUE_SEPARATOR.getBytes(), 0, headersBytes, offsetHeaders, NAME_VALUE_SEPARATOR.length()); offsetHeaders += NAME_VALUE_SEPARATOR.length(); // copy value System.arraycopy(value.getBytes(), 0, headersBytes, offsetHeaders, value.length()); offsetHeaders += value.length(); // CRLF System.arraycopy(this.HEADER_SUFFIX.getBytes(), 0, headersBytes, offsetHeaders, this.HEADER_SUFFIX.length()); offsetHeaders += this.HEADER_SUFFIX.length(); } // read the CRLF that separates the headers and the data System.arraycopy(this.HEADER_SUFFIX.getBytes(), 0, headersBytes, offsetHeaders, this.HEADER_SUFFIX.length()); offsetHeaders += this.HEADER_SUFFIX.length(); return new BufferSegment(headersBytes); } } --- NEW FILE: OutputDataStream.java --- /* * OutputDataStream.java $Revision: 1.1 $ $Date: 2001/10/31 00:32:37 $ * * Copyright (c) 2001 Huston Franklin. All rights reserved. * * The contents of this file are subject to the Blocks Public License (the * "License"); You may not use this file except in compliance with the License. * * You may obtain a copy of the License at http://www.invisible.net/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * */ package org.beepcore.beep.core; import java.util.LinkedList; import org.beepcore.beep.util.BufferSegment; /** * <code>OutputDataStream</code> represents a BEEP message's payload as a * stream. * * @author Huston Franklin * @version $Revision: 1.1 $, $Date: 2001/10/31 00:32:37 $ */ public class OutputDataStream { /** * Creates an <code>OutputDataStream</code> without any mime * headers. It is the responsibility of the application to ensure * the mime headers exist in the first <code>BufferSegment</code> * added. */ public OutputDataStream() { this.mimeHeaders = null; } /** * Creates an <code>OutputDataStream</code> using the specified * mime headers. * * @param headers Mime headers to be prepended to the buffers in * the stream. */ protected OutputDataStream(MimeHeaders headers) { this.mimeHeaders = headers; } /** * Creates an <code>OutputDataStream</code> using the specified * mime headers. * * @param headers Mime headers to be prepended to the buffers in * the stream. */ protected OutputDataStream(MimeHeaders headers, BufferSegment buf) { this.mimeHeaders = headers; this.add(buf); } public void add(BufferSegment segment) { this.buffers.addLast(segment); } /** * @deprecated */ public void close() { this.setComplete(); } /** * Returns <code>true</code> if no more bytes will be added to * those currently available on this stream. Returns * <code>false</code> if more bytes are expected. */ public boolean isComplete() { return this.complete; } public void setComplete() { this.complete = true; } boolean availableSegment() { return (buffers.isEmpty() == false); } BufferSegment getNextSegment(int maxLength) { if (this.headersSent == false) { if (this.mimeHeaders != null) { this.buffers.addFirst(mimeHeaders.getBufferSegment()); } this.headersSent = true; } BufferSegment b = (BufferSegment)buffers.getFirst(); if (curOffset != 0 || maxLength < b.getLength()) { int origLength = b.getLength(); b = new BufferSegment(b.getData(), b.getOffset() + curOffset, Math.min(maxLength, origLength - curOffset)); if (curOffset + b.getLength() != origLength) { curOffset += b.getLength(); return b; } } buffers.removeFirst(); curOffset = 0; return b; } protected final MimeHeaders mimeHeaders; private LinkedList buffers = new LinkedList(); private boolean complete = false; pr... [truncated message content] |