[asycxx-devel] SF.net SVN: asycxx:[57] trunk
Status: Alpha
Brought to you by:
joe_steeve
From: <viv...@us...> - 2009-04-20 09:28:39
|
Revision: 57 http://asycxx.svn.sourceforge.net/asycxx/?rev=57&view=rev Author: vivekanand83 Date: 2009-04-20 09:28:37 +0000 (Mon, 20 Apr 2009) Log Message: ----------- Added threading support. Modified Paths: -------------- trunk/configure.ac trunk/examples/Makefile.am trunk/examples/echoserver/Makefile.am trunk/include/asycxx/Transport.h trunk/src/Makefile.am Added Paths: ----------- trunk/examples/threadtest/ trunk/examples/threadtest/Makefile.am trunk/examples/threadtest/Threadtest.cxx trunk/include/asycxx/Thread.h trunk/src/Thread.cxx Modified: trunk/configure.ac =================================================================== --- trunk/configure.ac 2009-04-16 13:24:59 UTC (rev 56) +++ trunk/configure.ac 2009-04-20 09:28:37 UTC (rev 57) @@ -71,6 +71,7 @@ examples/Makefile examples/echoserver/Makefile examples/canserver/Makefile +examples/threadtest/Makefile asycxx-0.1.pc ]) AC_OUTPUT Modified: trunk/examples/Makefile.am =================================================================== --- trunk/examples/Makefile.am 2009-04-16 13:24:59 UTC (rev 56) +++ trunk/examples/Makefile.am 2009-04-20 09:28:37 UTC (rev 57) @@ -1,2 +1,2 @@ -SUBDIRS = echoserver #canserver +SUBDIRS = echoserver threadtest #canserver Modified: trunk/examples/echoserver/Makefile.am =================================================================== --- trunk/examples/echoserver/Makefile.am 2009-04-16 13:24:59 UTC (rev 56) +++ trunk/examples/echoserver/Makefile.am 2009-04-20 09:28:37 UTC (rev 57) @@ -4,7 +4,7 @@ EchoServer_SOURCES = EchoServer.cxx EchoServer_LDADD = $(top_srcdir)/src/libasycxx-0.1.la EchoServer_CPPFLAGS = -I$(top_srcdir)/include -EchoServer_LDFLAGS = -L$(top_srcdir)/src +EchoServer_LDFLAGS = -L$(top_srcdir)/src -lpthread EchoServer_debug_SOURCES = $(EchoServer_SOURCES) EchoServer_debug_CXXFLAGS = -g $(AM_CXXFLAGS) Added: trunk/examples/threadtest/Makefile.am =================================================================== --- trunk/examples/threadtest/Makefile.am (rev 0) +++ trunk/examples/threadtest/Makefile.am 2009-04-20 09:28:37 UTC (rev 57) @@ -0,0 +1,19 @@ + +noinst_PROGRAMS = Threadtest Threadtest.debug + +Threadtest_SOURCES = Threadtest.cxx +Threadtest_LDADD = $(top_srcdir)/src/libasycxx-0.1.la -lpthread +Threadtest_CPPFLAGS = -I$(top_srcdir)/include +Threadtest_LDFLAGS = -L$(top_srcdir)/src + +Threadtest_debug_SOURCES = $(Threadtest_SOURCES) +Threadtest_debug_CXXFLAGS = -g $(AM_CXXFLAGS) +Threadtest_debug_LDADD = $(top_srcdir)/src/.libs/libasycxx-dbg-0.1.a -lpthread +Threadtest_debug_CPPFLAGS = -I$(top_srcdir)/include +#Threadtest_debug_LDFLAGS = -L$(top_srcdir)/src + +# Local Variables: +# mode: makefile +# indent-tabs-mode: nil +# tab-width: 4 +# End: Added: trunk/examples/threadtest/Threadtest.cxx =================================================================== --- trunk/examples/threadtest/Threadtest.cxx (rev 0) +++ trunk/examples/threadtest/Threadtest.cxx 2009-04-20 09:28:37 UTC (rev 57) @@ -0,0 +1,67 @@ + +/******************************************************************** + * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All + * rights reserved. + * + * This program and the accompanying materials are made available + * under the terms described in the LICENSE file which accompanies + * this distribution. If the LICENSE file was not attached to this + * distribution or for further clarifications, please contact + * le...@hi.... + * + *******************************************************************/ + +#include <memory> + +#include <asycxx/Logger.h> +#include <asycxx/Error.h> +#include <asycxx/SelectReactor.h> +#include <asycxx/StreamProtocol.h> +#include <asycxx/StreamProtocolFactory.h> +#include <asycxx/TCPListener.h> +#include <asycxx/Thread.h> +#include <asycxx/DataBuffer.h> + +using namespace asycxx; + +Logger *Log; + +static DataBuffer* threadFunction(void *objthread, DataBuffer *obj) +{ + fprintf(stderr,"Thread function called!\n"); + return NULL; +} + +static void CallbackFunc(void *obj, DataBuffer *param) +{ + fprintf(stderr,"CallBack function called.\n"); +} + + + +int main (void) +{ + Reactor *reactor; + + Log = new Logger(); + Log->EnableConsole(); + + reactor = new SelectReactor (); + Thread *thread; + thread = new Thread(reactor, &threadFunction, + NULL, NULL, &CallbackFunc, + NULL); + reactor->Run (); + printf ("Closing reactor"); +} + +/* + Local Variables: + mode: c++ + indent-tabs-mode: nil + tab-width: 4 + c-file-style: "gnu" + End: +*/ + + Added: trunk/include/asycxx/Thread.h =================================================================== --- trunk/include/asycxx/Thread.h (rev 0) +++ trunk/include/asycxx/Thread.h 2009-04-20 09:28:37 UTC (rev 57) @@ -0,0 +1,108 @@ + +/******************************************************************** + * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All + * rights reserved. + * + * This program and the accompanying materials are made available + * under the terms described in the LICENSE file which accompanies + * this distribution. If the LICENSE file was not attached to this + * distribution or for further clarifications, please contact + * le...@hi.... + * + *******************************************************************/ + +#ifndef __HIPRO_ASYCXX__THREAD_H__ +#define __HIPRO_ASYCXX__THREAD_H__ + +#include <unistd.h> +#include <errno.h> +#include <string.h> + +#include "Error.h" +#include "StreamTransport.h" +#include "StreamProtocol.h" +#include "StreamProtocolFactory.h" + +namespace asycxx +{ + typedef DataBuffer* (*ThreadFunction_t)(void* objThread, DataBuffer* obj); + typedef void (*CallbackFunction_t)(void* obj, DataBuffer *param); + + class Thread + { + enum ThreadStatus + { + ThreadComplete = 0, + ThreadKilled + }; + + public: + Thread(Reactor *reactor, ThreadFunction_t thread_fn, + void *objThread, DataBuffer *data, + CallbackFunction_t cb_fn, + void *objCB); + pthread_t getThreadId(void) { return m_ThreadId; } + + void SetKill(){ m_KillThread = true; } + bool IsKillThreadSet() { return m_KillThread; } + void Die(); + + private: + // PipeProtocol class + class PipeProtocol: public StreamProtocol + { + public: + PipeProtocol (Reactor *reactor,StreamProtocolFactory *factory, + StreamTransport *transport, Thread *pthis); + + virtual ~PipeProtocol () {} + + /** + * \brief get the current Transport + */ + virtual StreamTransport* GetTransport (void) { return m_Transport; } + virtual void DataAvailable(DataBuffer *data); + + private: + Thread *m_Thread; + StreamTransport * m_Transport; + }; + + //PipeProtocolFactory class + class PipeProtocolFactory:StreamProtocolFactory + { + public: + PipeProtocolFactory (Reactor *reactor, Thread *pthis) : + StreamProtocolFactory (reactor) + { + m_Thread = pthis; + } + ~PipeProtocolFactory () {} + PipeProtocol *GetProtocol (StreamTransport *trans) + { + return new PipeProtocol (m_Reactor, this, trans, m_Thread); + } + + private: + Thread *m_Thread; + }; + + private: + ~Thread(); + static void *ThreadFunction(void *param); + pthread_t m_ThreadId; + int m_PipeFd[2]; + ThreadFunction_t m_ThreadFunc; + CallbackFunction_t m_ThreadCB; + void *m_ObjThread; + void *m_ObjCallBack; + DataBuffer *m_ObjData; + PipeProtocol *m_PipeProtocol; + DataBuffer *m_ThreadRetDB; + PipeProtocolFactory *m_PipeProtocolFactory; + StreamTransport *m_StreamTransport; + bool m_KillThread; + }; +} + +#endif //__HIPRO_ASYCXX__THREAD_H__ Modified: trunk/include/asycxx/Transport.h =================================================================== --- trunk/include/asycxx/Transport.h 2009-04-16 13:24:59 UTC (rev 56) +++ trunk/include/asycxx/Transport.h 2009-04-20 09:28:37 UTC (rev 57) @@ -40,22 +40,22 @@ /** * \brief Sets the transport ready for reading */ - void startReading (void); + virtual void startReading (void); /** * \brief Sets the transport to stop reading */ - void stopReading (void); + virtual void stopReading (void); /** * \brief Sets the transport ready for writing */ - void startWriting (void); + virtual void startWriting (void); /** * \brief Sets the transport to stop writing */ - void stopWriting (void); + virtual void stopWriting (void); /** * \brief Attach the given protocol to the Transport Modified: trunk/src/Makefile.am =================================================================== --- trunk/src/Makefile.am 2009-04-16 13:24:59 UTC (rev 56) +++ trunk/src/Makefile.am 2009-04-20 09:28:37 UTC (rev 57) @@ -22,7 +22,8 @@ GimpleMsgBus.cxx \ core.cxx \ RawCANTransport.cxx \ - RawCANListener.cxx + RawCANListener.cxx \ + Thread.cxx libasycxx_0_1_la_CPPFLAGS = -I$(top_srcdir)/include Added: trunk/src/Thread.cxx =================================================================== --- trunk/src/Thread.cxx (rev 0) +++ trunk/src/Thread.cxx 2009-04-20 09:28:37 UTC (rev 57) @@ -0,0 +1,160 @@ +/******************************************************************** + * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All + * rights reserved. + * + * This program and the accompanying materials are made available + * under the terms described in the LICENSE file which accompanies + * this distribution. If the LICENSE file was not attached to this + * distribution or for further clarifications, please contact + * le...@hi.... + * + *******************************************************************/ + +#ifdef HAVE_CONFIG_H +#include <asycxx-config.h> +#endif + +#include <errno.h> +#include <unistd.h> +#include <string.h> +#include <deque> + +#include <sys/socket.h> +#include <errno.h> +#include <fcntl.h> + + +#include "asycxx-common.h" +#include <asycxx/Error.h> +#include <asycxx/Protocol.h> +#include <asycxx/Thread.h> + +using namespace asycxx; +/** + *Ctor + */ +Thread::Thread(Reactor *reactor, ThreadFunction_t thread_fn, + void *objThread, DataBuffer *data, + CallbackFunction_t cb_fn, + void *objCB) +{ + int ret = pipe(m_PipeFd); + ASSERT((ret == 0), "Pipe creation for thread communication failed."); + + SetFDAsNonBlocking(m_PipeFd[0]); + SetFDAsNonBlocking(m_PipeFd[1]); + m_PipeProtocolFactory = new PipeProtocolFactory(reactor, this); + + /* create a transport */ + m_StreamTransport = new StreamTransport (reactor, m_PipeFd[0]); + m_PipeProtocol = m_PipeProtocolFactory->GetProtocol(m_StreamTransport); + m_StreamTransport->setProtocol (m_PipeProtocol); + + m_ThreadFunc = thread_fn; + m_ThreadCB = cb_fn; + m_ObjThread = objThread; + m_ObjCallBack = objCB; + m_ThreadRetDB = NULL; + m_KillThread = false; + + if (data != NULL) + { + m_ObjData = data; + m_ObjData->Own(); + } + else + m_ObjData = NULL; + + ret = pthread_create(&m_ThreadId, NULL, ThreadFunction, (void *) this); + ASSERT((ret == 0), "Thread creation failed") +} + +/** + * Dtor + */ +Thread::~Thread() +{ + m_StreamTransport->closeReader (); + close(m_PipeFd[1]); + + delete m_PipeProtocol; + delete m_PipeProtocolFactory; + delete m_StreamTransport; +} + +/** + *\brief: Thread function to start in a thread. + * This function will call the threadFunction which has to do the main + * job. On successful completion of the main thread function or failure, + * it will write the status to the pipe and quit. + */ +void* +Thread::ThreadFunction(void *param) +{ + Thread *pThis = (Thread *)param; + int ret = -1; + pThis->m_ThreadRetDB = pThis->m_ThreadFunc(pThis->m_ObjThread, + pThis->m_ObjData); + + int status = ThreadComplete; + + while (ret <= 0) + ret = write(pThis->m_PipeFd[1], &status, sizeof(ThreadStatus)); + pthread_exit(0); +} + +/** + *\brief Ctor for PipeProtocol + */ +Thread::PipeProtocol::PipeProtocol (Reactor *reactor,StreamProtocolFactory *factory, + StreamTransport *transport, Thread *pthis): + StreamProtocol (reactor, (StreamProtocolFactory *)factory, transport) +{ + m_Thread = pthis; + m_Transport = transport; + m_Transport->startReading(); +} + +/** + *\brief Called by protocol when data is available on the pipe + */ +void Thread::PipeProtocol::DataAvailable(DataBuffer *data) +{ + LOG("%s", __FUNCTION__); + ASSERT ((data != NULL), "DataBuffer is empty"); + pthread_t threadId = m_Thread->getThreadId(); + pthread_join(threadId, NULL); + if (m_Thread->m_ObjData != NULL) + m_Thread->m_ObjData->DisOwn(); + + if (m_Thread->IsKillThreadSet() == true) + { + if (m_Thread->m_ThreadRetDB != NULL) + m_Thread->m_ThreadRetDB->DisOwn(); + m_Thread->Die(); + return; + } + + Thread::ThreadStatus *status = (ThreadStatus*)data->Data(); + if ((*status == Thread::ThreadComplete)) + m_Thread->m_ThreadCB(m_Thread->m_ObjCallBack, m_Thread->m_ThreadRetDB); + if (m_Thread->m_ThreadRetDB != NULL) + m_Thread->m_ThreadRetDB->DisOwn(); +} + +/** + *\brief after completion of the task call this to delete the thread object + */ +void Thread::Die() +{ + delete this; +} + +/* + Local Variables: + mode: c++ + indent-tabs-mode: nil + tab-width: 4 + c-file-style: "gnu" + End: +*/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |