[asycxx-devel] SF.net SVN: asycxx:[27] trunk
Status: Alpha
Brought to you by:
joe_steeve
From: <joe...@us...> - 2009-04-08 07:31:26
|
Revision: 27 http://asycxx.svn.sourceforge.net/asycxx/?rev=27&view=rev Author: joe_steeve Date: 2009-04-08 07:31:24 +0000 (Wed, 08 Apr 2009) Log Message: ----------- added class:StreamTransport to hold all connection oriented Transports From: Joe Steeve <js...@hi...> Added Paths: ----------- trunk/include/asycxx/StreamTransport.h trunk/src/StreamTransport.cxx Added: trunk/include/asycxx/StreamTransport.h =================================================================== --- trunk/include/asycxx/StreamTransport.h (rev 0) +++ trunk/include/asycxx/StreamTransport.h 2009-04-08 07:31:24 UTC (rev 27) @@ -0,0 +1,127 @@ + +/******************************************************************** + * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All + * rights reserved. + * + * This program and the accompanying materials are made available + * under the terms described in the LICENSE file which accompanies + * this distribution. If the LICENSE file was not attached to this + * distribution or for further clarifications, please contact + * le...@hi.... + * + *******************************************************************/ + +#ifndef __HIPRO_ASYCXX__STREAM_TRANSPORT_H__ +#define __HIPRO_ASYCXX__STREAM_TRANSPORT_H__ + +#include "Error.h" +#include "Transport.h" +#include "DataBuffer.h" +#include "StreamProtocol.h" + +#define ASYCXX_TRANSPORT_DEFAULT_READ_BUFSIZE 1024 +namespace asycxx +{ + class StreamTransport : public Transport + { + public: + /** + * \brief CTOR + * + * \param[in] reactor Pointer to a Reactor + * \param[in] fd A valid file-descriptor to build the transport on + */ + StreamTransport (Reactor *reactor, int fd); + virtual ~StreamTransport (); + + /** + * \brief Queues the provided data-buffer for writing + * + * \param[in] data The DataBuffer object that should be written + * + * \details This method queues the given DataBuffer to the + * m_writeBuffers queue. This queue is flushed out when the + * file-descriptor becomes writable next. This class assumes + * ownership of the provided DataBuffer. + */ + void Write (DataBuffer *data); + + /** + * \brief Mark the transport for destruction + * + * \details This method marks the transport for destruction. This + * should be used by the protocol to notify the transport to + * destory itself. The Transport will do the destruction once it + * regains control from the event handlers that it has called. + */ + void loseConnection (void); + + /** + * \brief set the size of the read-buffer + * + * \details This method configures the size of the read-buffer to + * the given size. This is an upper limit on the amount of data + * read at any given time. If available data is more than this + * length, then it is read in multiple parts (chunks). + */ + void configureReadBufferSize (size_t len); + + /** + * \brief configure a timeout for the 'wait for readable' + * operation + */ + void configureReadTimeout (asycxx_msecs_t timeout); + + /** + * \brief configure a timeout for the 'wait for writable' + * operation + */ + void configureWriteTimeout (asycxx_msecs_t timeout); + + /** + * \brief Attach the given protocol to the Transport + */ + StreamProtocol * getProtocol (void) + { + return (StreamProtocol *) Transport::getProtocol(); + } + + /** + * \brief Get the currently attached protocol + */ + void setProtocol (StreamProtocol *protocol) + { + Transport::setProtocol ((Protocol *)protocol); + } + + /* Methods to provide 'Selectable' interface. Lookup 'Selectable' + * for documentation */ + virtual void Readable (void); + virtual void Writable (void); + virtual void Close (void); + virtual void closeReader (void); + virtual void closeWriter (void); + + protected: + asycxx_msecs_t m_writeTimeout; /* the timeout used for writing */ + asycxx_msecs_t m_readTimeout; /* the timeout used for reading */ + size_t m_readBufSize; // max buffer size to read + + std::list<DataBuffer *> m_writeBuffers; + bool m_bDisconnectAndDie; + + /* methods to be implemented by the derived sub-classes */ + virtual RetCode doWrite (void); + virtual RetCode doRead (DataBuffer *data); + }; +} +#endif /* __HIPRO_ASYCXX__STREAM_TRANSPORT_H__ */ + +/* + Local Variables: + mode: c++ + indent-tabs-mode: nil + tab-width: 4 + c-file-style: "gnu" + End: +*/ Added: trunk/src/StreamTransport.cxx =================================================================== --- trunk/src/StreamTransport.cxx (rev 0) +++ trunk/src/StreamTransport.cxx 2009-04-08 07:31:24 UTC (rev 27) @@ -0,0 +1,311 @@ + +/******************************************************************** + * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All + * rights reserved. + * + * This program and the accompanying materials are made available + * under the terms described in the LICENSE file which accompanies + * this distribution. If the LICENSE file was not attached to this + * distribution or for further clarifications, please contact + * le...@hi.... + * + *******************************************************************/ + +#ifdef HAVE_CONFIG_H +#include <asycxx-config.h> +#endif + +#include <unistd.h> +#include <errno.h> +#include <string.h> + +#include "asycxx-common.h" +#include <asycxx/Error.h> +#include <asycxx/StreamTransport.h> +#include <asycxx/StreamProtocol.h> + +using namespace asycxx; + + +/* + * CTOR + */ +StreamTransport::StreamTransport (Reactor *reactor, int fd) + : Transport (reactor, fd) +{ + m_bDisconnectAndDie = false; + + /* default configuration */ + configureReadTimeout (0L); + configureWriteTimeout (0L); + configureReadBufferSize (ASYCXX_TRANSPORT_DEFAULT_READ_BUFSIZE); +} + + +/* + * DTOR + */ +StreamTransport::~StreamTransport () +{ + DataBuffer *data; + + /* discard the DataBuffers in the pending queue */ + std::list<DataBuffer *>::iterator it; + for (it = m_writeBuffers.begin(); it != m_writeBuffers.end(); it++) + { + data = *it; + ERR ("Transport<%p> dumping write-buffer <%p>, len=%d", + this, data->Data(), data->Len()); + data->DisOwn (); + } +} + + +void +StreamTransport::configureReadBufferSize (size_t len) +{ + ASSERT ((len != 0), "zero length read-buffer??"); + m_readBufSize = len; +} + + +/* TODO: setup a timer to handle timeout and inform the protocol about + * the timeout */ +void +StreamTransport::configureReadTimeout (asycxx_msecs_t timeout) +{ + m_readTimeout = timeout; +} + +void +StreamTransport::configureWriteTimeout (asycxx_msecs_t timeout) +{ + m_writeTimeout = timeout; +} + + +/* Method to Write a buffer. This method should queue the buffer for + * writing. The actual writing should happen later when the + * file-handle becomes writable. */ +void +StreamTransport::Write (DataBuffer *data) +{ + ASSERT ((data != NULL), "given data-buffer is NULL"); + /* If we are not on the reactor's writers list, then add ourselves + to the reactor's writers list */ + startWriting(); + /* add the DataBuffer to the queue */ + m_writeBuffers.push_back (data); + /* assume ownership of the given data-buffer. we do this last so + that any exception in the middle does not cause a leak */ + data->Own (); +} + + +/* Mark the transport for destruction */ +void +StreamTransport::loseConnection (void) +{ + m_bDisconnectAndDie = true; +} + + +/* This method is called by the reactor when the transport becomes + readable */ +void +StreamTransport::Readable (void) +{ + DataBuffer *data; + RetCode ret; + StreamProtocol *proto; + proto = getProtocol (); + + /** \todo: should use ioctl(m_Fd, SIOCINQ) to find the amount of + * unread data in the socket and then construct the DataBuffer. This + * should be done in the 'doRead' coz, Transport does not know what + * kind of file-descriptor we have with us. */ + + /* Read till there is no more */ + while (1) + { + data = new DataBuffer (m_readBufSize); + + /* call the sub-classes read-method to fill up the + data-buffer */ + ret = doRead (data); + if (ret != RetCode_Success) + { + /* reading from transport failed */ + data->DisOwn (); + proto->ReadError (); + goto __handlelReadable__ScreamAndDie__; + } + + if (data->Len() == 0) + { + /* If there is no more to read, then get out */ + data->DisOwn (); + break; + } + + /* give data-buffer to the protocol. after this point, the + Protocol takes responsibility for the data-buffer object */ + proto->DataAvailable (data); + data->DisOwn (); + + /* check whether we should die */ + if (m_bDisconnectAndDie == true) + { + goto __handlelReadable__ScreamAndDie__; + } + } + return; + + __handlelReadable__ScreamAndDie__ : + Close (); + proto->ConnectionLost (); + delete this; +} + + +/* This method is called by the reactor when a 'Selectable' becomes + * writable. */ +void +StreamTransport::Writable (void) +{ + StreamProtocol *proto; + proto = getProtocol (); + RetCode ret; + + /* call the base-class method to write the pending buffers */ + ret = doWrite (); + if (ret == RetCode_Error) + { + /* writing to transport failed */ + proto->WriteError (); + goto __handlelWritable__ScreamAndDie__; + } + if (ret == RetCode_Success) + { + /* nothing more to write */ + stopWriting(); + } + return; + + __handlelWritable__ScreamAndDie__ : + Close (); + proto->ConnectionLost (); + delete this; +} + + +RetCode +StreamTransport::doRead (DataBuffer *data) +{ + ASSERT ((data != NULL), "checking parameters"); + ssize_t rlen; + + __doRead__again: + rlen = read (Fd(), data->Data(), data->BufferLen()); + if (rlen == -1) + { + if (errno == EINTR) + { + goto __doRead__again; + } + else if (errno == EAGAIN) + { + data->Len(0); + return RetCode_Success; + } + else + { + ERR ("%s: reading from fd=%d", strerror(errno), Fd()); + return RetCode_Error; + } + } + + /* check for end-of-file */ + if (rlen == 0) + { + ERR ("EoF on fd=%d", Fd()); + return RetCode_Error; + } + data->Len(rlen); + + return RetCode_Success; +} + + +RetCode +StreamTransport::doWrite (void) +{ + ssize_t wrote; + ssize_t wlen, to_wlen; + DataBuffer *data; + void *bfr; + std::list<DataBuffer *>::iterator it; + + wrote = 0; + it = m_writeBuffers.begin(); + while (it != m_writeBuffers.end()) + { + data = *it; + to_wlen = data->Len() - data->Processed(); + bfr = data->UnProcessed (); + + wlen = write (Fd(), bfr, to_wlen); + if (wlen == -1) + { + if (errno == EINTR) { continue; } + else if (errno == EAGAIN) { return RetCode_Pending; } + else + { + ERR ("%s: writing DataBuffer<%p>::data<%p>,len=%d to fd=%d", + strerror (errno), data, bfr, to_wlen, Fd()); + return RetCode_Error; + } + } + + /* update pointers */ + data->Processed((data->Processed() + wlen)); + wrote += wlen; + + if (data->Processed() == data->Len()) + { + it = m_writeBuffers.erase(it); + data->DisOwn(); + } + } + + return RetCode_Success; +} + + +void +StreamTransport::Close (void) +{ + ERR ("I got nothing special to close"); +} + +void +StreamTransport::closeReader (void) +{ + ERR ("I dont know how to close the reader-end"); +} + + +void +StreamTransport::closeWriter (void) +{ + ERR ("I dont know how to close the writer-end"); +} + +/* + Local Variables: + mode: c++ + indent-tabs-mode: nil + tab-width: 4 + c-file-style: "gnu" + End: +*/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |