[asycxx-devel] SF.net SVN: asycxx:[60] trunk
Status: Alpha
Brought to you by:
joe_steeve
From: <joe...@us...> - 2009-04-22 14:23:48
|
Revision: 60 http://asycxx.svn.sourceforge.net/asycxx/?rev=60&view=rev Author: joe_steeve Date: 2009-04-22 13:39:32 +0000 (Wed, 22 Apr 2009) Log Message: ----------- minor cleanup and refactor of Thread.* From: Joe Steeve <js...@hi...> Modified Paths: -------------- trunk/include/asycxx/Thread.h trunk/src/Thread.cxx Modified: trunk/include/asycxx/Thread.h =================================================================== --- trunk/include/asycxx/Thread.h 2009-04-22 13:38:53 UTC (rev 59) +++ trunk/include/asycxx/Thread.h 2009-04-22 13:39:32 UTC (rev 60) @@ -17,92 +17,107 @@ #include <unistd.h> #include <errno.h> #include <string.h> +#include <pthread.h> #include "Error.h" #include "StreamTransport.h" #include "StreamProtocol.h" #include "StreamProtocolFactory.h" +#include "DataBuffer.h" namespace asycxx { - typedef DataBuffer* (*ThreadFunction_t)(void* objThread, DataBuffer* obj); - typedef void (*CallbackFunction_t)(void* obj, DataBuffer *param); + typedef DataBuffer* (*ThreadFunction_t)(void* obj, DataBuffer* param); + typedef void (*CallbackFunction_t)(void* obj, DataBuffer *result); class Thread { enum ThreadStatus + { + ThreadComplete = 1, + ThreadKilled + }; + + public: + Thread(Reactor *reactor, + ThreadFunction_t th_fn, void *th_obj, DataBuffer *th_param, + CallbackFunction_t cb_fn, void *cb_obj); + + pthread_t getThreadId(void) { return m_ThreadId; } + + void Own (void) { m_Owned = true; } + void DisOwn (void); + bool IsOwned (void) { return m_Owned; } + + private: + // PipeProtocol class + class PipeProtocol: public StreamProtocol { - ThreadComplete = 0, - ThreadKilled + public: + PipeProtocol (Reactor *reactor, StreamProtocolFactory *factory, + StreamTransport *transport, Thread *thread); + virtual ~PipeProtocol () {} + + virtual StreamTransport* GetTransport (void) { return m_Transport; } + virtual void DataAvailable (DataBuffer *data); + + private: + Thread *m_Thread; + StreamTransport * m_Transport; }; + //PipeProtocolFactory class + class PipeProtocolFactory: public StreamProtocolFactory + { public: - Thread(Reactor *reactor, ThreadFunction_t thread_fn, - void *objThread, DataBuffer *data, - CallbackFunction_t cb_fn, - void *objCB); - pthread_t getThreadId(void) { return m_ThreadId; } + PipeProtocolFactory (Reactor *reactor, Thread *thread) + : StreamProtocolFactory (reactor) + { + m_Thread = thread; + } + ~PipeProtocolFactory () {} - void SetKill(){ m_KillThread = true; } - bool IsKillThreadSet() { return m_KillThread; } - void Die(); + PipeProtocol *GetProtocol (StreamTransport *trans) + { + return new PipeProtocol (m_Reactor, this, trans, m_Thread); + } private: - // PipeProtocol class - class PipeProtocol: public StreamProtocol - { - public: - PipeProtocol (Reactor *reactor,StreamProtocolFactory *factory, - StreamTransport *transport, Thread *pthis); + Thread *m_Thread; + }; - virtual ~PipeProtocol () {} + private: + ~Thread (); - /** - * \brief get the current Transport - */ - virtual StreamTransport* GetTransport (void) { return m_Transport; } - virtual void DataAvailable(DataBuffer *data); + static void * ThreadFunction (void *obj); - private: - Thread *m_Thread; - StreamTransport * m_Transport; - }; + pthread_t m_ThreadId; + int m_PipeFd[2]; - //PipeProtocolFactory class - class PipeProtocolFactory:StreamProtocolFactory - { - public: - PipeProtocolFactory (Reactor *reactor, Thread *pthis) : - StreamProtocolFactory (reactor) - { - m_Thread = pthis; - } - ~PipeProtocolFactory () {} - PipeProtocol *GetProtocol (StreamTransport *trans) - { - return new PipeProtocol (m_Reactor, this, trans, m_Thread); - } + ThreadFunction_t m_Thread_Func; + void *m_Thread_Obj; + CallbackFunction_t m_CB_Func; + void *m_CB_Obj; - private: - Thread *m_Thread; - }; + DataBuffer *m_Thread_Param; + DataBuffer *m_Thread_Result; - private: - ~Thread(); - static void *ThreadFunction(void *param); - pthread_t m_ThreadId; - int m_PipeFd[2]; - ThreadFunction_t m_ThreadFunc; - CallbackFunction_t m_ThreadCB; - void *m_ObjThread; - void *m_ObjCallBack; - DataBuffer *m_ObjData; - PipeProtocol *m_PipeProtocol; - DataBuffer *m_ThreadRetDB; - PipeProtocolFactory *m_PipeProtocolFactory; - StreamTransport *m_StreamTransport; - bool m_KillThread; + PipeProtocol *m_PipeProtocol; + PipeProtocolFactory *m_PipeProtocolFactory; + StreamTransport *m_StreamTransport; + + bool m_Owned; + bool m_ThreadAlive; }; } #endif //__HIPRO_ASYCXX__THREAD_H__ + +/* + Local Variables: + mode: c++ + indent-tabs-mode: nil + tab-width: 4 + c-file-style: "gnu" + End: +*/ Modified: trunk/src/Thread.cxx =================================================================== --- trunk/src/Thread.cxx 2009-04-22 13:38:53 UTC (rev 59) +++ trunk/src/Thread.cxx 2009-04-22 13:39:32 UTC (rev 60) @@ -1,3 +1,4 @@ + /******************************************************************** * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All * rights reserved. @@ -22,134 +23,174 @@ #include <sys/socket.h> #include <errno.h> #include <fcntl.h> +#include <pthread.h> - #include "asycxx-common.h" #include <asycxx/Error.h> #include <asycxx/Protocol.h> #include <asycxx/Thread.h> +#include <asycxx/DataBuffer.h> using namespace asycxx; -/** - *Ctor + +/* + * Ctor */ -Thread::Thread(Reactor *reactor, ThreadFunction_t thread_fn, - void *objThread, DataBuffer *data, - CallbackFunction_t cb_fn, - void *objCB) +Thread::Thread(Reactor *reactor, + ThreadFunction_t th_fn, void * th_obj, DataBuffer *th_param, + CallbackFunction_t cb_fn, void *cb_obj) { int ret = pipe(m_PipeFd); ASSERT((ret == 0), "Pipe creation for thread communication failed."); - SetFDAsNonBlocking(m_PipeFd[0]); - SetFDAsNonBlocking(m_PipeFd[1]); + /* setup a protocol with transport to handle the thread's completion + notification message over a UNIX-pipe */ m_PipeProtocolFactory = new PipeProtocolFactory(reactor, this); - - /* create a transport */ m_StreamTransport = new StreamTransport (reactor, m_PipeFd[0]); m_PipeProtocol = m_PipeProtocolFactory->GetProtocol(m_StreamTransport); m_StreamTransport->setProtocol (m_PipeProtocol); - m_ThreadFunc = thread_fn; - m_ThreadCB = cb_fn; - m_ObjThread = objThread; - m_ObjCallBack = objCB; - m_ThreadRetDB = NULL; - m_KillThread = false; + m_Thread_Func = th_fn; + m_Thread_Obj = th_obj; + m_CB_Func = cb_fn; + m_CB_Obj = cb_obj; - if (data != NULL) - { - m_ObjData = data; - m_ObjData->Own(); - } - else - m_ObjData = NULL; + /* take ownership of the thread's parameter */ + m_Thread_Param = th_param; + if (th_param != NULL) + { + th_param->Own(); + } + m_Thread_Result =NULL; - ret = pthread_create(&m_ThreadId, NULL, ThreadFunction, (void *) this); - ASSERT((ret == 0), "Thread creation failed") + /* by default we are owned by our owner */ + m_Owned = true; + + /* start the thread */ + ret = pthread_create(&m_ThreadId, NULL, ThreadFunction, (void *)this); + ASSERT((ret == 0), "Thread creation failed"); + m_ThreadAlive = true; } -/** + +/* * Dtor */ Thread::~Thread() { - m_StreamTransport->closeReader (); close(m_PipeFd[1]); + /* disown the buffers we own */ + if (m_Thread_Result != NULL) + { + m_Thread_Result->DisOwn(); + } + if (m_Thread_Param != NULL) + { + m_Thread_Param->DisOwn(); + } + delete m_PipeProtocol; delete m_PipeProtocolFactory; delete m_StreamTransport; } -/** - *\brief: Thread function to start in a thread. - * This function will call the threadFunction which has to do the main - * job. On successful completion of the main thread function or failure, - * it will write the status to the pipe and quit. + +void Thread::DisOwn (void) +{ + m_Owned = false; + if (m_ThreadAlive == false) + { + /* we got no owners and thread is not around. So, just die */ + delete this; + } +} + + +/* + * Thread function to start in a thread. + * + * This function will call the 'function to be called in thread' in a + * thread. On successful completion of the main thread function or + * failure, it will write the status to the pipe and quit. */ void* -Thread::ThreadFunction(void *param) +Thread::ThreadFunction (void *obj) { - Thread *pThis = (Thread *)param; - int ret = -1; - pThis->m_ThreadRetDB = pThis->m_ThreadFunc(pThis->m_ObjThread, - pThis->m_ObjData); + ASSERT((obj != NULL), "bad parameter"); + + Thread *pThis = (Thread *)obj; + int ret; + ASSERT((pThis->m_Thread_Func != NULL), "badly constructed"); + + pThis->m_Thread_Result = pThis->m_Thread_Func (pThis->m_Thread_Obj, + pThis->m_Thread_Param); + + if (pThis->m_Thread_Result != NULL) + { + pThis->m_Thread_Result->Own (); + } + int status = ThreadComplete; + ret = write(pThis->m_PipeFd[1], &status, sizeof(ThreadStatus)); + if (ret == -1) + { + ERR ("(THREAD-END) writing to pipe : %s", strerror(errno)); + } - while (ret <= 0) - ret = write(pThis->m_PipeFd[1], &status, sizeof(ThreadStatus)); pthread_exit(0); } -/** - *\brief Ctor for PipeProtocol + +/* + * Ctor for PipeProtocol */ -Thread::PipeProtocol::PipeProtocol (Reactor *reactor,StreamProtocolFactory *factory, - StreamTransport *transport, Thread *pthis): - StreamProtocol (reactor, (StreamProtocolFactory *)factory, transport) +Thread::PipeProtocol::PipeProtocol (Reactor *reactor, + StreamProtocolFactory *factory, + StreamTransport *transport, Thread *thread) + : StreamProtocol (reactor, (StreamProtocolFactory *)factory, transport) { - m_Thread = pthis; + m_Thread = thread; m_Transport = transport; m_Transport->startReading(); } -/** - *\brief Called by protocol when data is available on the pipe + +/* + * Called by protocol when data is available on the pipe */ -void Thread::PipeProtocol::DataAvailable(DataBuffer *data) +void Thread::PipeProtocol::DataAvailable (DataBuffer *data) { - LOG("%s", __FUNCTION__); ASSERT ((data != NULL), "DataBuffer is empty"); - pthread_t threadId = m_Thread->getThreadId(); - pthread_join(threadId, NULL); - if (m_Thread->m_ObjData != NULL) - m_Thread->m_ObjData->DisOwn(); - if (m_Thread->IsKillThreadSet() == true) - { - if (m_Thread->m_ThreadRetDB != NULL) - m_Thread->m_ThreadRetDB->DisOwn(); - m_Thread->Die(); - return; - } - Thread::ThreadStatus *status = (ThreadStatus*)data->Data(); - if ((*status == Thread::ThreadComplete)) - m_Thread->m_ThreadCB(m_Thread->m_ObjCallBack, m_Thread->m_ThreadRetDB); - if (m_Thread->m_ThreadRetDB != NULL) - m_Thread->m_ThreadRetDB->DisOwn(); -} + if (*status == Thread::ThreadComplete) + { + /* thread has completed */ + pthread_t threadId = m_Thread->getThreadId(); + pthread_join (threadId, NULL); -/** - *\brief after completion of the task call this to delete the thread object - */ -void Thread::Die() -{ - delete this; + /* Are we still owned? */ + if ((m_Thread->IsOwned() == true) && (m_Thread->m_CB_Func != NULL)) + { + m_Thread->m_CB_Func (m_Thread->m_CB_Obj, m_Thread->m_Thread_Result); + } + } + + /* Do we have a owner? */ + if (m_Thread->IsOwned() == false) + { + /* we just got to die :-S */ + delete m_Thread; + } + + /* mark the thread as dead so that future 'DisOwn's will delete + us */ + m_Thread->m_ThreadAlive = false; } + /* Local Variables: mode: c++ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |