[asycxx-devel] SF.net SVN: asycxx:[62] trunk
Status: Alpha
Brought to you by:
joe_steeve
From: <viv...@us...> - 2009-04-25 10:23:18
|
Revision: 62 http://asycxx.svn.sourceforge.net/asycxx/?rev=62&view=rev Author: vivekanand83 Date: 2009-04-25 10:23:03 +0000 (Sat, 25 Apr 2009) Log Message: ----------- Impelented 'Call in Reactor' for Thread. Modified Paths: -------------- trunk/examples/threadtest/Threadtest.cxx trunk/include/asycxx/Thread.h trunk/src/Thread.cxx Modified: trunk/examples/threadtest/Threadtest.cxx =================================================================== --- trunk/examples/threadtest/Threadtest.cxx 2009-04-22 13:40:03 UTC (rev 61) +++ trunk/examples/threadtest/Threadtest.cxx 2009-04-25 10:23:03 UTC (rev 62) @@ -27,12 +27,20 @@ Logger *Log; -static DataBuffer* threadFunction(void *objthread, DataBuffer *obj) +static void callInReatorFn(void *th_obj, DataBuffer *data) { - fprintf(stderr,"Thread function called!\n"); + fprintf (stderr, "%s called\n", __FUNCTION__); +} + +static DataBuffer* threadFunction(void *thmain_obj,void *objthread, DataBuffer *obj) +{ + fprintf(stderr,"Thread function called!\n"); + Thread *mainThread = (Thread*)thmain_obj; + mainThread->CallInReactor(mainThread, &callInReatorFn, NULL); return NULL; } + static void CallbackFunc(void *obj, DataBuffer *param) { fprintf(stderr,"CallBack function called.\n"); Modified: trunk/include/asycxx/Thread.h =================================================================== --- trunk/include/asycxx/Thread.h 2009-04-22 13:40:03 UTC (rev 61) +++ trunk/include/asycxx/Thread.h 2009-04-25 10:23:03 UTC (rev 62) @@ -27,14 +27,17 @@ namespace asycxx { - typedef DataBuffer* (*ThreadFunction_t)(void* obj, DataBuffer* param); - typedef void (*CallbackFunction_t)(void* obj, DataBuffer *result); - + typedef DataBuffer* (*ThreadFunction_t)(void *thmain_obj, void* obj, + DataBuffer* param); + typedef void (*CallbackFunction_t)(void* obj, DataBuffer *result); + typedef void (*CallInReactorFunction_t)(void *th_obj, + DataBuffer *data); class Thread { enum ThreadStatus { ThreadComplete = 1, + ThreadCallInReactor, ThreadKilled }; @@ -44,6 +47,9 @@ CallbackFunction_t cb_fn, void *cb_obj); pthread_t getThreadId(void) { return m_ThreadId; } + void CallInReactor(Thread *th_obj, CallInReactorFunction_t cir_fn, + DataBuffer *cir_param); + void HandleCallInReactor(void); void Own (void) { m_Owned = true; } void DisOwn (void); @@ -89,7 +95,7 @@ private: ~Thread (); - static void * ThreadFunction (void *obj); + static void* ThreadFunction_Main(void *obj); pthread_t m_ThreadId; int m_PipeFd[2]; @@ -99,6 +105,9 @@ CallbackFunction_t m_CB_Func; void *m_CB_Obj; + CallInReactorFunction_t m_CIRFunc; + DataBuffer *m_CIR_Param; + DataBuffer *m_Thread_Param; DataBuffer *m_Thread_Result; @@ -107,7 +116,8 @@ StreamTransport *m_StreamTransport; bool m_Owned; - bool m_ThreadAlive; + bool m_ThreadAlive; + pthread_mutex_t m_CallInThreadMutex; }; } Modified: trunk/src/Thread.cxx =================================================================== --- trunk/src/Thread.cxx 2009-04-22 13:40:03 UTC (rev 61) +++ trunk/src/Thread.cxx 2009-04-25 10:23:03 UTC (rev 62) @@ -66,9 +66,10 @@ /* by default we are owned by our owner */ m_Owned = true; + pthread_mutex_init(&m_CallInThreadMutex, NULL); /* start the thread */ - ret = pthread_create(&m_ThreadId, NULL, ThreadFunction, (void *)this); - ASSERT((ret == 0), "Thread creation failed"); + ret = pthread_create(&m_ThreadId, NULL, ThreadFunction_Main, (void *)this); + ASSERT((ret == 0), "Thread creation failed %s", strerror(errno)); m_ThreadAlive = true; } @@ -90,9 +91,10 @@ m_Thread_Param->DisOwn(); } - delete m_PipeProtocol; - delete m_PipeProtocolFactory; - delete m_StreamTransport; + LOG("Destroying thread"); +// delete m_PipeProtocol; +// delete m_PipeProtocolFactory; +// delete m_StreamTransport; } @@ -102,6 +104,7 @@ if (m_ThreadAlive == false) { /* we got no owners and thread is not around. So, just die */ + m_StreamTransport->loseConnection(); delete this; } } @@ -115,7 +118,7 @@ * failure, it will write the status to the pipe and quit. */ void* -Thread::ThreadFunction (void *obj) +Thread::ThreadFunction_Main (void *obj) { ASSERT((obj != NULL), "bad parameter"); @@ -124,7 +127,7 @@ ASSERT((pThis->m_Thread_Func != NULL), "badly constructed"); - pThis->m_Thread_Result = pThis->m_Thread_Func (pThis->m_Thread_Obj, + pThis->m_Thread_Result = pThis->m_Thread_Func (pThis,pThis->m_Thread_Obj, pThis->m_Thread_Param); if (pThis->m_Thread_Result != NULL) @@ -132,13 +135,13 @@ pThis->m_Thread_Result->Own (); } - int status = ThreadComplete; + int status = ThreadComplete; ret = write(pThis->m_PipeFd[1], &status, sizeof(ThreadStatus)); if (ret == -1) - { - ERR ("(THREAD-END) writing to pipe : %s", strerror(errno)); - } - + { + ERR ("(THREAD-END) writing to pipe : %s", strerror(errno)); + } + pthread_exit(0); } @@ -166,22 +169,29 @@ Thread::ThreadStatus *status = (ThreadStatus*)data->Data(); if (*status == Thread::ThreadComplete) - { - /* thread has completed */ - pthread_t threadId = m_Thread->getThreadId(); - pthread_join (threadId, NULL); + { + /* thread has completed */ + pthread_t threadId = m_Thread->getThreadId(); + pthread_join (threadId, NULL); - /* Are we still owned? */ - if ((m_Thread->IsOwned() == true) && (m_Thread->m_CB_Func != NULL)) - { - m_Thread->m_CB_Func (m_Thread->m_CB_Obj, m_Thread->m_Thread_Result); - } - } + /* Are we still owned? */ + if ((m_Thread->IsOwned() == true) && (m_Thread->m_CB_Func != NULL)) + { + m_Thread->m_CB_Func (m_Thread->m_CB_Obj, m_Thread->m_Thread_Result); + } + } + else if (*status == Thread::ThreadCallInReactor) + { + m_Thread->HandleCallInReactor(); + return; + } /* Do we have a owner? */ if (m_Thread->IsOwned() == false) { /* we just got to die :-S */ + LOG("deleting thread"); + m_Transport->loseConnection(); delete m_Thread; } @@ -190,7 +200,32 @@ m_Thread->m_ThreadAlive = false; } +void Thread::CallInReactor(Thread *thmain_obj, CallInReactorFunction_t cir_fn, + DataBuffer *cir_param) +{ + thmain_obj->m_CIRFunc = cir_fn; + thmain_obj->m_CIR_Param = cir_param; + pthread_mutex_lock(&thmain_obj->m_CallInThreadMutex); + + int status = ThreadCallInReactor; + int ret = write(thmain_obj->m_PipeFd[1], &status, sizeof(ThreadStatus)); + if (ret == -1) + { + ERR ("(THREAD-END) writing to pipe : %s", strerror(errno)); + } + + pthread_mutex_lock(&thmain_obj->m_CallInThreadMutex); + pthread_mutex_unlock(&thmain_obj->m_CallInThreadMutex); + return; +} +void Thread::HandleCallInReactor(void) +{ + m_CIRFunc(m_Thread_Obj, m_CIR_Param); + pthread_mutex_unlock(&m_CallInThreadMutex); +} + + /* Local Variables: mode: c++ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |