[asycxx-devel] SF.net SVN: asycxx:[27] trunk
Status: Alpha
Brought to you by:
joe_steeve
|
From: <joe...@us...> - 2009-04-08 07:31:26
|
Revision: 27
http://asycxx.svn.sourceforge.net/asycxx/?rev=27&view=rev
Author: joe_steeve
Date: 2009-04-08 07:31:24 +0000 (Wed, 08 Apr 2009)
Log Message:
-----------
added class:StreamTransport to hold all connection oriented Transports
From: Joe Steeve <js...@hi...>
Added Paths:
-----------
trunk/include/asycxx/StreamTransport.h
trunk/src/StreamTransport.cxx
Added: trunk/include/asycxx/StreamTransport.h
===================================================================
--- trunk/include/asycxx/StreamTransport.h (rev 0)
+++ trunk/include/asycxx/StreamTransport.h 2009-04-08 07:31:24 UTC (rev 27)
@@ -0,0 +1,127 @@
+
+/********************************************************************
+ * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All
+ * rights reserved.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms described in the LICENSE file which accompanies
+ * this distribution. If the LICENSE file was not attached to this
+ * distribution or for further clarifications, please contact
+ * le...@hi....
+ *
+ *******************************************************************/
+
+#ifndef __HIPRO_ASYCXX__STREAM_TRANSPORT_H__
+#define __HIPRO_ASYCXX__STREAM_TRANSPORT_H__
+
+#include "Error.h"
+#include "Transport.h"
+#include "DataBuffer.h"
+#include "StreamProtocol.h"
+
+#define ASYCXX_TRANSPORT_DEFAULT_READ_BUFSIZE 1024
+namespace asycxx
+{
+ class StreamTransport : public Transport
+ {
+ public:
+ /**
+ * \brief CTOR
+ *
+ * \param[in] reactor Pointer to a Reactor
+ * \param[in] fd A valid file-descriptor to build the transport on
+ */
+ StreamTransport (Reactor *reactor, int fd);
+ virtual ~StreamTransport ();
+
+ /**
+ * \brief Queues the provided data-buffer for writing
+ *
+ * \param[in] data The DataBuffer object that should be written
+ *
+ * \details This method queues the given DataBuffer to the
+ * m_writeBuffers queue. This queue is flushed out when the
+ * file-descriptor becomes writable next. This class assumes
+ * ownership of the provided DataBuffer.
+ */
+ void Write (DataBuffer *data);
+
+ /**
+ * \brief Mark the transport for destruction
+ *
+ * \details This method marks the transport for destruction. This
+ * should be used by the protocol to notify the transport to
+ * destory itself. The Transport will do the destruction once it
+ * regains control from the event handlers that it has called.
+ */
+ void loseConnection (void);
+
+ /**
+ * \brief set the size of the read-buffer
+ *
+ * \details This method configures the size of the read-buffer to
+ * the given size. This is an upper limit on the amount of data
+ * read at any given time. If available data is more than this
+ * length, then it is read in multiple parts (chunks).
+ */
+ void configureReadBufferSize (size_t len);
+
+ /**
+ * \brief configure a timeout for the 'wait for readable'
+ * operation
+ */
+ void configureReadTimeout (asycxx_msecs_t timeout);
+
+ /**
+ * \brief configure a timeout for the 'wait for writable'
+ * operation
+ */
+ void configureWriteTimeout (asycxx_msecs_t timeout);
+
+ /**
+ * \brief Attach the given protocol to the Transport
+ */
+ StreamProtocol * getProtocol (void)
+ {
+ return (StreamProtocol *) Transport::getProtocol();
+ }
+
+ /**
+ * \brief Get the currently attached protocol
+ */
+ void setProtocol (StreamProtocol *protocol)
+ {
+ Transport::setProtocol ((Protocol *)protocol);
+ }
+
+ /* Methods to provide 'Selectable' interface. Lookup 'Selectable'
+ * for documentation */
+ virtual void Readable (void);
+ virtual void Writable (void);
+ virtual void Close (void);
+ virtual void closeReader (void);
+ virtual void closeWriter (void);
+
+ protected:
+ asycxx_msecs_t m_writeTimeout; /* the timeout used for writing */
+ asycxx_msecs_t m_readTimeout; /* the timeout used for reading */
+ size_t m_readBufSize; // max buffer size to read
+
+ std::list<DataBuffer *> m_writeBuffers;
+ bool m_bDisconnectAndDie;
+
+ /* methods to be implemented by the derived sub-classes */
+ virtual RetCode doWrite (void);
+ virtual RetCode doRead (DataBuffer *data);
+ };
+}
+#endif /* __HIPRO_ASYCXX__STREAM_TRANSPORT_H__ */
+
+/*
+ Local Variables:
+ mode: c++
+ indent-tabs-mode: nil
+ tab-width: 4
+ c-file-style: "gnu"
+ End:
+*/
Added: trunk/src/StreamTransport.cxx
===================================================================
--- trunk/src/StreamTransport.cxx (rev 0)
+++ trunk/src/StreamTransport.cxx 2009-04-08 07:31:24 UTC (rev 27)
@@ -0,0 +1,311 @@
+
+/********************************************************************
+ * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All
+ * rights reserved.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms described in the LICENSE file which accompanies
+ * this distribution. If the LICENSE file was not attached to this
+ * distribution or for further clarifications, please contact
+ * le...@hi....
+ *
+ *******************************************************************/
+
+#ifdef HAVE_CONFIG_H
+#include <asycxx-config.h>
+#endif
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+
+#include "asycxx-common.h"
+#include <asycxx/Error.h>
+#include <asycxx/StreamTransport.h>
+#include <asycxx/StreamProtocol.h>
+
+using namespace asycxx;
+
+
+/*
+ * CTOR
+ */
+StreamTransport::StreamTransport (Reactor *reactor, int fd)
+ : Transport (reactor, fd)
+{
+ m_bDisconnectAndDie = false;
+
+ /* default configuration */
+ configureReadTimeout (0L);
+ configureWriteTimeout (0L);
+ configureReadBufferSize (ASYCXX_TRANSPORT_DEFAULT_READ_BUFSIZE);
+}
+
+
+/*
+ * DTOR
+ */
+StreamTransport::~StreamTransport ()
+{
+ DataBuffer *data;
+
+ /* discard the DataBuffers in the pending queue */
+ std::list<DataBuffer *>::iterator it;
+ for (it = m_writeBuffers.begin(); it != m_writeBuffers.end(); it++)
+ {
+ data = *it;
+ ERR ("Transport<%p> dumping write-buffer <%p>, len=%d",
+ this, data->Data(), data->Len());
+ data->DisOwn ();
+ }
+}
+
+
+void
+StreamTransport::configureReadBufferSize (size_t len)
+{
+ ASSERT ((len != 0), "zero length read-buffer??");
+ m_readBufSize = len;
+}
+
+
+/* TODO: setup a timer to handle timeout and inform the protocol about
+ * the timeout */
+void
+StreamTransport::configureReadTimeout (asycxx_msecs_t timeout)
+{
+ m_readTimeout = timeout;
+}
+
+void
+StreamTransport::configureWriteTimeout (asycxx_msecs_t timeout)
+{
+ m_writeTimeout = timeout;
+}
+
+
+/* Method to Write a buffer. This method should queue the buffer for
+ * writing. The actual writing should happen later when the
+ * file-handle becomes writable. */
+void
+StreamTransport::Write (DataBuffer *data)
+{
+ ASSERT ((data != NULL), "given data-buffer is NULL");
+ /* If we are not on the reactor's writers list, then add ourselves
+ to the reactor's writers list */
+ startWriting();
+ /* add the DataBuffer to the queue */
+ m_writeBuffers.push_back (data);
+ /* assume ownership of the given data-buffer. we do this last so
+ that any exception in the middle does not cause a leak */
+ data->Own ();
+}
+
+
+/* Mark the transport for destruction */
+void
+StreamTransport::loseConnection (void)
+{
+ m_bDisconnectAndDie = true;
+}
+
+
+/* This method is called by the reactor when the transport becomes
+ readable */
+void
+StreamTransport::Readable (void)
+{
+ DataBuffer *data;
+ RetCode ret;
+ StreamProtocol *proto;
+ proto = getProtocol ();
+
+ /** \todo: should use ioctl(m_Fd, SIOCINQ) to find the amount of
+ * unread data in the socket and then construct the DataBuffer. This
+ * should be done in the 'doRead' coz, Transport does not know what
+ * kind of file-descriptor we have with us. */
+
+ /* Read till there is no more */
+ while (1)
+ {
+ data = new DataBuffer (m_readBufSize);
+
+ /* call the sub-classes read-method to fill up the
+ data-buffer */
+ ret = doRead (data);
+ if (ret != RetCode_Success)
+ {
+ /* reading from transport failed */
+ data->DisOwn ();
+ proto->ReadError ();
+ goto __handlelReadable__ScreamAndDie__;
+ }
+
+ if (data->Len() == 0)
+ {
+ /* If there is no more to read, then get out */
+ data->DisOwn ();
+ break;
+ }
+
+ /* give data-buffer to the protocol. after this point, the
+ Protocol takes responsibility for the data-buffer object */
+ proto->DataAvailable (data);
+ data->DisOwn ();
+
+ /* check whether we should die */
+ if (m_bDisconnectAndDie == true)
+ {
+ goto __handlelReadable__ScreamAndDie__;
+ }
+ }
+ return;
+
+ __handlelReadable__ScreamAndDie__ :
+ Close ();
+ proto->ConnectionLost ();
+ delete this;
+}
+
+
+/* This method is called by the reactor when a 'Selectable' becomes
+ * writable. */
+void
+StreamTransport::Writable (void)
+{
+ StreamProtocol *proto;
+ proto = getProtocol ();
+ RetCode ret;
+
+ /* call the base-class method to write the pending buffers */
+ ret = doWrite ();
+ if (ret == RetCode_Error)
+ {
+ /* writing to transport failed */
+ proto->WriteError ();
+ goto __handlelWritable__ScreamAndDie__;
+ }
+ if (ret == RetCode_Success)
+ {
+ /* nothing more to write */
+ stopWriting();
+ }
+ return;
+
+ __handlelWritable__ScreamAndDie__ :
+ Close ();
+ proto->ConnectionLost ();
+ delete this;
+}
+
+
+RetCode
+StreamTransport::doRead (DataBuffer *data)
+{
+ ASSERT ((data != NULL), "checking parameters");
+ ssize_t rlen;
+
+ __doRead__again:
+ rlen = read (Fd(), data->Data(), data->BufferLen());
+ if (rlen == -1)
+ {
+ if (errno == EINTR)
+ {
+ goto __doRead__again;
+ }
+ else if (errno == EAGAIN)
+ {
+ data->Len(0);
+ return RetCode_Success;
+ }
+ else
+ {
+ ERR ("%s: reading from fd=%d", strerror(errno), Fd());
+ return RetCode_Error;
+ }
+ }
+
+ /* check for end-of-file */
+ if (rlen == 0)
+ {
+ ERR ("EoF on fd=%d", Fd());
+ return RetCode_Error;
+ }
+ data->Len(rlen);
+
+ return RetCode_Success;
+}
+
+
+RetCode
+StreamTransport::doWrite (void)
+{
+ ssize_t wrote;
+ ssize_t wlen, to_wlen;
+ DataBuffer *data;
+ void *bfr;
+ std::list<DataBuffer *>::iterator it;
+
+ wrote = 0;
+ it = m_writeBuffers.begin();
+ while (it != m_writeBuffers.end())
+ {
+ data = *it;
+ to_wlen = data->Len() - data->Processed();
+ bfr = data->UnProcessed ();
+
+ wlen = write (Fd(), bfr, to_wlen);
+ if (wlen == -1)
+ {
+ if (errno == EINTR) { continue; }
+ else if (errno == EAGAIN) { return RetCode_Pending; }
+ else
+ {
+ ERR ("%s: writing DataBuffer<%p>::data<%p>,len=%d to fd=%d",
+ strerror (errno), data, bfr, to_wlen, Fd());
+ return RetCode_Error;
+ }
+ }
+
+ /* update pointers */
+ data->Processed((data->Processed() + wlen));
+ wrote += wlen;
+
+ if (data->Processed() == data->Len())
+ {
+ it = m_writeBuffers.erase(it);
+ data->DisOwn();
+ }
+ }
+
+ return RetCode_Success;
+}
+
+
+void
+StreamTransport::Close (void)
+{
+ ERR ("I got nothing special to close");
+}
+
+void
+StreamTransport::closeReader (void)
+{
+ ERR ("I dont know how to close the reader-end");
+}
+
+
+void
+StreamTransport::closeWriter (void)
+{
+ ERR ("I dont know how to close the writer-end");
+}
+
+/*
+ Local Variables:
+ mode: c++
+ indent-tabs-mode: nil
+ tab-width: 4
+ c-file-style: "gnu"
+ End:
+*/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|