[asycxx-devel] SF.net SVN: asycxx:[62] trunk
Status: Alpha
Brought to you by:
joe_steeve
|
From: <viv...@us...> - 2009-04-25 10:23:18
|
Revision: 62
http://asycxx.svn.sourceforge.net/asycxx/?rev=62&view=rev
Author: vivekanand83
Date: 2009-04-25 10:23:03 +0000 (Sat, 25 Apr 2009)
Log Message:
-----------
Impelented 'Call in Reactor' for Thread.
Modified Paths:
--------------
trunk/examples/threadtest/Threadtest.cxx
trunk/include/asycxx/Thread.h
trunk/src/Thread.cxx
Modified: trunk/examples/threadtest/Threadtest.cxx
===================================================================
--- trunk/examples/threadtest/Threadtest.cxx 2009-04-22 13:40:03 UTC (rev 61)
+++ trunk/examples/threadtest/Threadtest.cxx 2009-04-25 10:23:03 UTC (rev 62)
@@ -27,12 +27,20 @@
Logger *Log;
-static DataBuffer* threadFunction(void *objthread, DataBuffer *obj)
+static void callInReatorFn(void *th_obj, DataBuffer *data)
{
- fprintf(stderr,"Thread function called!\n");
+ fprintf (stderr, "%s called\n", __FUNCTION__);
+}
+
+static DataBuffer* threadFunction(void *thmain_obj,void *objthread, DataBuffer *obj)
+{
+ fprintf(stderr,"Thread function called!\n");
+ Thread *mainThread = (Thread*)thmain_obj;
+ mainThread->CallInReactor(mainThread, &callInReatorFn, NULL);
return NULL;
}
+
static void CallbackFunc(void *obj, DataBuffer *param)
{
fprintf(stderr,"CallBack function called.\n");
Modified: trunk/include/asycxx/Thread.h
===================================================================
--- trunk/include/asycxx/Thread.h 2009-04-22 13:40:03 UTC (rev 61)
+++ trunk/include/asycxx/Thread.h 2009-04-25 10:23:03 UTC (rev 62)
@@ -27,14 +27,17 @@
namespace asycxx
{
- typedef DataBuffer* (*ThreadFunction_t)(void* obj, DataBuffer* param);
- typedef void (*CallbackFunction_t)(void* obj, DataBuffer *result);
-
+ typedef DataBuffer* (*ThreadFunction_t)(void *thmain_obj, void* obj,
+ DataBuffer* param);
+ typedef void (*CallbackFunction_t)(void* obj, DataBuffer *result);
+ typedef void (*CallInReactorFunction_t)(void *th_obj,
+ DataBuffer *data);
class Thread
{
enum ThreadStatus
{
ThreadComplete = 1,
+ ThreadCallInReactor,
ThreadKilled
};
@@ -44,6 +47,9 @@
CallbackFunction_t cb_fn, void *cb_obj);
pthread_t getThreadId(void) { return m_ThreadId; }
+ void CallInReactor(Thread *th_obj, CallInReactorFunction_t cir_fn,
+ DataBuffer *cir_param);
+ void HandleCallInReactor(void);
void Own (void) { m_Owned = true; }
void DisOwn (void);
@@ -89,7 +95,7 @@
private:
~Thread ();
- static void * ThreadFunction (void *obj);
+ static void* ThreadFunction_Main(void *obj);
pthread_t m_ThreadId;
int m_PipeFd[2];
@@ -99,6 +105,9 @@
CallbackFunction_t m_CB_Func;
void *m_CB_Obj;
+ CallInReactorFunction_t m_CIRFunc;
+ DataBuffer *m_CIR_Param;
+
DataBuffer *m_Thread_Param;
DataBuffer *m_Thread_Result;
@@ -107,7 +116,8 @@
StreamTransport *m_StreamTransport;
bool m_Owned;
- bool m_ThreadAlive;
+ bool m_ThreadAlive;
+ pthread_mutex_t m_CallInThreadMutex;
};
}
Modified: trunk/src/Thread.cxx
===================================================================
--- trunk/src/Thread.cxx 2009-04-22 13:40:03 UTC (rev 61)
+++ trunk/src/Thread.cxx 2009-04-25 10:23:03 UTC (rev 62)
@@ -66,9 +66,10 @@
/* by default we are owned by our owner */
m_Owned = true;
+ pthread_mutex_init(&m_CallInThreadMutex, NULL);
/* start the thread */
- ret = pthread_create(&m_ThreadId, NULL, ThreadFunction, (void *)this);
- ASSERT((ret == 0), "Thread creation failed");
+ ret = pthread_create(&m_ThreadId, NULL, ThreadFunction_Main, (void *)this);
+ ASSERT((ret == 0), "Thread creation failed %s", strerror(errno));
m_ThreadAlive = true;
}
@@ -90,9 +91,10 @@
m_Thread_Param->DisOwn();
}
- delete m_PipeProtocol;
- delete m_PipeProtocolFactory;
- delete m_StreamTransport;
+ LOG("Destroying thread");
+// delete m_PipeProtocol;
+// delete m_PipeProtocolFactory;
+// delete m_StreamTransport;
}
@@ -102,6 +104,7 @@
if (m_ThreadAlive == false)
{
/* we got no owners and thread is not around. So, just die */
+ m_StreamTransport->loseConnection();
delete this;
}
}
@@ -115,7 +118,7 @@
* failure, it will write the status to the pipe and quit.
*/
void*
-Thread::ThreadFunction (void *obj)
+Thread::ThreadFunction_Main (void *obj)
{
ASSERT((obj != NULL), "bad parameter");
@@ -124,7 +127,7 @@
ASSERT((pThis->m_Thread_Func != NULL), "badly constructed");
- pThis->m_Thread_Result = pThis->m_Thread_Func (pThis->m_Thread_Obj,
+ pThis->m_Thread_Result = pThis->m_Thread_Func (pThis,pThis->m_Thread_Obj,
pThis->m_Thread_Param);
if (pThis->m_Thread_Result != NULL)
@@ -132,13 +135,13 @@
pThis->m_Thread_Result->Own ();
}
- int status = ThreadComplete;
+ int status = ThreadComplete;
ret = write(pThis->m_PipeFd[1], &status, sizeof(ThreadStatus));
if (ret == -1)
- {
- ERR ("(THREAD-END) writing to pipe : %s", strerror(errno));
- }
-
+ {
+ ERR ("(THREAD-END) writing to pipe : %s", strerror(errno));
+ }
+
pthread_exit(0);
}
@@ -166,22 +169,29 @@
Thread::ThreadStatus *status = (ThreadStatus*)data->Data();
if (*status == Thread::ThreadComplete)
- {
- /* thread has completed */
- pthread_t threadId = m_Thread->getThreadId();
- pthread_join (threadId, NULL);
+ {
+ /* thread has completed */
+ pthread_t threadId = m_Thread->getThreadId();
+ pthread_join (threadId, NULL);
- /* Are we still owned? */
- if ((m_Thread->IsOwned() == true) && (m_Thread->m_CB_Func != NULL))
- {
- m_Thread->m_CB_Func (m_Thread->m_CB_Obj, m_Thread->m_Thread_Result);
- }
- }
+ /* Are we still owned? */
+ if ((m_Thread->IsOwned() == true) && (m_Thread->m_CB_Func != NULL))
+ {
+ m_Thread->m_CB_Func (m_Thread->m_CB_Obj, m_Thread->m_Thread_Result);
+ }
+ }
+ else if (*status == Thread::ThreadCallInReactor)
+ {
+ m_Thread->HandleCallInReactor();
+ return;
+ }
/* Do we have a owner? */
if (m_Thread->IsOwned() == false)
{
/* we just got to die :-S */
+ LOG("deleting thread");
+ m_Transport->loseConnection();
delete m_Thread;
}
@@ -190,7 +200,32 @@
m_Thread->m_ThreadAlive = false;
}
+void Thread::CallInReactor(Thread *thmain_obj, CallInReactorFunction_t cir_fn,
+ DataBuffer *cir_param)
+{
+ thmain_obj->m_CIRFunc = cir_fn;
+ thmain_obj->m_CIR_Param = cir_param;
+ pthread_mutex_lock(&thmain_obj->m_CallInThreadMutex);
+
+ int status = ThreadCallInReactor;
+ int ret = write(thmain_obj->m_PipeFd[1], &status, sizeof(ThreadStatus));
+ if (ret == -1)
+ {
+ ERR ("(THREAD-END) writing to pipe : %s", strerror(errno));
+ }
+
+ pthread_mutex_lock(&thmain_obj->m_CallInThreadMutex);
+ pthread_mutex_unlock(&thmain_obj->m_CallInThreadMutex);
+ return;
+}
+void Thread::HandleCallInReactor(void)
+{
+ m_CIRFunc(m_Thread_Obj, m_CIR_Param);
+ pthread_mutex_unlock(&m_CallInThreadMutex);
+}
+
+
/*
Local Variables:
mode: c++
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|