[asycxx-devel] SF.net SVN: asycxx:[60] trunk
Status: Alpha
Brought to you by:
joe_steeve
|
From: <joe...@us...> - 2009-04-22 14:23:48
|
Revision: 60
http://asycxx.svn.sourceforge.net/asycxx/?rev=60&view=rev
Author: joe_steeve
Date: 2009-04-22 13:39:32 +0000 (Wed, 22 Apr 2009)
Log Message:
-----------
minor cleanup and refactor of Thread.*
From: Joe Steeve <js...@hi...>
Modified Paths:
--------------
trunk/include/asycxx/Thread.h
trunk/src/Thread.cxx
Modified: trunk/include/asycxx/Thread.h
===================================================================
--- trunk/include/asycxx/Thread.h 2009-04-22 13:38:53 UTC (rev 59)
+++ trunk/include/asycxx/Thread.h 2009-04-22 13:39:32 UTC (rev 60)
@@ -17,92 +17,107 @@
#include <unistd.h>
#include <errno.h>
#include <string.h>
+#include <pthread.h>
#include "Error.h"
#include "StreamTransport.h"
#include "StreamProtocol.h"
#include "StreamProtocolFactory.h"
+#include "DataBuffer.h"
namespace asycxx
{
- typedef DataBuffer* (*ThreadFunction_t)(void* objThread, DataBuffer* obj);
- typedef void (*CallbackFunction_t)(void* obj, DataBuffer *param);
+ typedef DataBuffer* (*ThreadFunction_t)(void* obj, DataBuffer* param);
+ typedef void (*CallbackFunction_t)(void* obj, DataBuffer *result);
class Thread
{
enum ThreadStatus
+ {
+ ThreadComplete = 1,
+ ThreadKilled
+ };
+
+ public:
+ Thread(Reactor *reactor,
+ ThreadFunction_t th_fn, void *th_obj, DataBuffer *th_param,
+ CallbackFunction_t cb_fn, void *cb_obj);
+
+ pthread_t getThreadId(void) { return m_ThreadId; }
+
+ void Own (void) { m_Owned = true; }
+ void DisOwn (void);
+ bool IsOwned (void) { return m_Owned; }
+
+ private:
+ // PipeProtocol class
+ class PipeProtocol: public StreamProtocol
{
- ThreadComplete = 0,
- ThreadKilled
+ public:
+ PipeProtocol (Reactor *reactor, StreamProtocolFactory *factory,
+ StreamTransport *transport, Thread *thread);
+ virtual ~PipeProtocol () {}
+
+ virtual StreamTransport* GetTransport (void) { return m_Transport; }
+ virtual void DataAvailable (DataBuffer *data);
+
+ private:
+ Thread *m_Thread;
+ StreamTransport * m_Transport;
};
+ //PipeProtocolFactory class
+ class PipeProtocolFactory: public StreamProtocolFactory
+ {
public:
- Thread(Reactor *reactor, ThreadFunction_t thread_fn,
- void *objThread, DataBuffer *data,
- CallbackFunction_t cb_fn,
- void *objCB);
- pthread_t getThreadId(void) { return m_ThreadId; }
+ PipeProtocolFactory (Reactor *reactor, Thread *thread)
+ : StreamProtocolFactory (reactor)
+ {
+ m_Thread = thread;
+ }
+ ~PipeProtocolFactory () {}
- void SetKill(){ m_KillThread = true; }
- bool IsKillThreadSet() { return m_KillThread; }
- void Die();
+ PipeProtocol *GetProtocol (StreamTransport *trans)
+ {
+ return new PipeProtocol (m_Reactor, this, trans, m_Thread);
+ }
private:
- // PipeProtocol class
- class PipeProtocol: public StreamProtocol
- {
- public:
- PipeProtocol (Reactor *reactor,StreamProtocolFactory *factory,
- StreamTransport *transport, Thread *pthis);
+ Thread *m_Thread;
+ };
- virtual ~PipeProtocol () {}
+ private:
+ ~Thread ();
- /**
- * \brief get the current Transport
- */
- virtual StreamTransport* GetTransport (void) { return m_Transport; }
- virtual void DataAvailable(DataBuffer *data);
+ static void * ThreadFunction (void *obj);
- private:
- Thread *m_Thread;
- StreamTransport * m_Transport;
- };
+ pthread_t m_ThreadId;
+ int m_PipeFd[2];
- //PipeProtocolFactory class
- class PipeProtocolFactory:StreamProtocolFactory
- {
- public:
- PipeProtocolFactory (Reactor *reactor, Thread *pthis) :
- StreamProtocolFactory (reactor)
- {
- m_Thread = pthis;
- }
- ~PipeProtocolFactory () {}
- PipeProtocol *GetProtocol (StreamTransport *trans)
- {
- return new PipeProtocol (m_Reactor, this, trans, m_Thread);
- }
+ ThreadFunction_t m_Thread_Func;
+ void *m_Thread_Obj;
+ CallbackFunction_t m_CB_Func;
+ void *m_CB_Obj;
- private:
- Thread *m_Thread;
- };
+ DataBuffer *m_Thread_Param;
+ DataBuffer *m_Thread_Result;
- private:
- ~Thread();
- static void *ThreadFunction(void *param);
- pthread_t m_ThreadId;
- int m_PipeFd[2];
- ThreadFunction_t m_ThreadFunc;
- CallbackFunction_t m_ThreadCB;
- void *m_ObjThread;
- void *m_ObjCallBack;
- DataBuffer *m_ObjData;
- PipeProtocol *m_PipeProtocol;
- DataBuffer *m_ThreadRetDB;
- PipeProtocolFactory *m_PipeProtocolFactory;
- StreamTransport *m_StreamTransport;
- bool m_KillThread;
+ PipeProtocol *m_PipeProtocol;
+ PipeProtocolFactory *m_PipeProtocolFactory;
+ StreamTransport *m_StreamTransport;
+
+ bool m_Owned;
+ bool m_ThreadAlive;
};
}
#endif //__HIPRO_ASYCXX__THREAD_H__
+
+/*
+ Local Variables:
+ mode: c++
+ indent-tabs-mode: nil
+ tab-width: 4
+ c-file-style: "gnu"
+ End:
+*/
Modified: trunk/src/Thread.cxx
===================================================================
--- trunk/src/Thread.cxx 2009-04-22 13:38:53 UTC (rev 59)
+++ trunk/src/Thread.cxx 2009-04-22 13:39:32 UTC (rev 60)
@@ -1,3 +1,4 @@
+
/********************************************************************
* Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All
* rights reserved.
@@ -22,134 +23,174 @@
#include <sys/socket.h>
#include <errno.h>
#include <fcntl.h>
+#include <pthread.h>
-
#include "asycxx-common.h"
#include <asycxx/Error.h>
#include <asycxx/Protocol.h>
#include <asycxx/Thread.h>
+#include <asycxx/DataBuffer.h>
using namespace asycxx;
-/**
- *Ctor
+
+/*
+ * Ctor
*/
-Thread::Thread(Reactor *reactor, ThreadFunction_t thread_fn,
- void *objThread, DataBuffer *data,
- CallbackFunction_t cb_fn,
- void *objCB)
+Thread::Thread(Reactor *reactor,
+ ThreadFunction_t th_fn, void * th_obj, DataBuffer *th_param,
+ CallbackFunction_t cb_fn, void *cb_obj)
{
int ret = pipe(m_PipeFd);
ASSERT((ret == 0), "Pipe creation for thread communication failed.");
- SetFDAsNonBlocking(m_PipeFd[0]);
- SetFDAsNonBlocking(m_PipeFd[1]);
+ /* setup a protocol with transport to handle the thread's completion
+ notification message over a UNIX-pipe */
m_PipeProtocolFactory = new PipeProtocolFactory(reactor, this);
-
- /* create a transport */
m_StreamTransport = new StreamTransport (reactor, m_PipeFd[0]);
m_PipeProtocol = m_PipeProtocolFactory->GetProtocol(m_StreamTransport);
m_StreamTransport->setProtocol (m_PipeProtocol);
- m_ThreadFunc = thread_fn;
- m_ThreadCB = cb_fn;
- m_ObjThread = objThread;
- m_ObjCallBack = objCB;
- m_ThreadRetDB = NULL;
- m_KillThread = false;
+ m_Thread_Func = th_fn;
+ m_Thread_Obj = th_obj;
+ m_CB_Func = cb_fn;
+ m_CB_Obj = cb_obj;
- if (data != NULL)
- {
- m_ObjData = data;
- m_ObjData->Own();
- }
- else
- m_ObjData = NULL;
+ /* take ownership of the thread's parameter */
+ m_Thread_Param = th_param;
+ if (th_param != NULL)
+ {
+ th_param->Own();
+ }
+ m_Thread_Result =NULL;
- ret = pthread_create(&m_ThreadId, NULL, ThreadFunction, (void *) this);
- ASSERT((ret == 0), "Thread creation failed")
+ /* by default we are owned by our owner */
+ m_Owned = true;
+
+ /* start the thread */
+ ret = pthread_create(&m_ThreadId, NULL, ThreadFunction, (void *)this);
+ ASSERT((ret == 0), "Thread creation failed");
+ m_ThreadAlive = true;
}
-/**
+
+/*
* Dtor
*/
Thread::~Thread()
{
- m_StreamTransport->closeReader ();
close(m_PipeFd[1]);
+ /* disown the buffers we own */
+ if (m_Thread_Result != NULL)
+ {
+ m_Thread_Result->DisOwn();
+ }
+ if (m_Thread_Param != NULL)
+ {
+ m_Thread_Param->DisOwn();
+ }
+
delete m_PipeProtocol;
delete m_PipeProtocolFactory;
delete m_StreamTransport;
}
-/**
- *\brief: Thread function to start in a thread.
- * This function will call the threadFunction which has to do the main
- * job. On successful completion of the main thread function or failure,
- * it will write the status to the pipe and quit.
+
+void Thread::DisOwn (void)
+{
+ m_Owned = false;
+ if (m_ThreadAlive == false)
+ {
+ /* we got no owners and thread is not around. So, just die */
+ delete this;
+ }
+}
+
+
+/*
+ * Thread function to start in a thread.
+ *
+ * This function will call the 'function to be called in thread' in a
+ * thread. On successful completion of the main thread function or
+ * failure, it will write the status to the pipe and quit.
*/
void*
-Thread::ThreadFunction(void *param)
+Thread::ThreadFunction (void *obj)
{
- Thread *pThis = (Thread *)param;
- int ret = -1;
- pThis->m_ThreadRetDB = pThis->m_ThreadFunc(pThis->m_ObjThread,
- pThis->m_ObjData);
+ ASSERT((obj != NULL), "bad parameter");
+
+ Thread *pThis = (Thread *)obj;
+ int ret;
+ ASSERT((pThis->m_Thread_Func != NULL), "badly constructed");
+
+ pThis->m_Thread_Result = pThis->m_Thread_Func (pThis->m_Thread_Obj,
+ pThis->m_Thread_Param);
+
+ if (pThis->m_Thread_Result != NULL)
+ {
+ pThis->m_Thread_Result->Own ();
+ }
+
int status = ThreadComplete;
+ ret = write(pThis->m_PipeFd[1], &status, sizeof(ThreadStatus));
+ if (ret == -1)
+ {
+ ERR ("(THREAD-END) writing to pipe : %s", strerror(errno));
+ }
- while (ret <= 0)
- ret = write(pThis->m_PipeFd[1], &status, sizeof(ThreadStatus));
pthread_exit(0);
}
-/**
- *\brief Ctor for PipeProtocol
+
+/*
+ * Ctor for PipeProtocol
*/
-Thread::PipeProtocol::PipeProtocol (Reactor *reactor,StreamProtocolFactory *factory,
- StreamTransport *transport, Thread *pthis):
- StreamProtocol (reactor, (StreamProtocolFactory *)factory, transport)
+Thread::PipeProtocol::PipeProtocol (Reactor *reactor,
+ StreamProtocolFactory *factory,
+ StreamTransport *transport, Thread *thread)
+ : StreamProtocol (reactor, (StreamProtocolFactory *)factory, transport)
{
- m_Thread = pthis;
+ m_Thread = thread;
m_Transport = transport;
m_Transport->startReading();
}
-/**
- *\brief Called by protocol when data is available on the pipe
+
+/*
+ * Called by protocol when data is available on the pipe
*/
-void Thread::PipeProtocol::DataAvailable(DataBuffer *data)
+void Thread::PipeProtocol::DataAvailable (DataBuffer *data)
{
- LOG("%s", __FUNCTION__);
ASSERT ((data != NULL), "DataBuffer is empty");
- pthread_t threadId = m_Thread->getThreadId();
- pthread_join(threadId, NULL);
- if (m_Thread->m_ObjData != NULL)
- m_Thread->m_ObjData->DisOwn();
- if (m_Thread->IsKillThreadSet() == true)
- {
- if (m_Thread->m_ThreadRetDB != NULL)
- m_Thread->m_ThreadRetDB->DisOwn();
- m_Thread->Die();
- return;
- }
-
Thread::ThreadStatus *status = (ThreadStatus*)data->Data();
- if ((*status == Thread::ThreadComplete))
- m_Thread->m_ThreadCB(m_Thread->m_ObjCallBack, m_Thread->m_ThreadRetDB);
- if (m_Thread->m_ThreadRetDB != NULL)
- m_Thread->m_ThreadRetDB->DisOwn();
-}
+ if (*status == Thread::ThreadComplete)
+ {
+ /* thread has completed */
+ pthread_t threadId = m_Thread->getThreadId();
+ pthread_join (threadId, NULL);
-/**
- *\brief after completion of the task call this to delete the thread object
- */
-void Thread::Die()
-{
- delete this;
+ /* Are we still owned? */
+ if ((m_Thread->IsOwned() == true) && (m_Thread->m_CB_Func != NULL))
+ {
+ m_Thread->m_CB_Func (m_Thread->m_CB_Obj, m_Thread->m_Thread_Result);
+ }
+ }
+
+ /* Do we have a owner? */
+ if (m_Thread->IsOwned() == false)
+ {
+ /* we just got to die :-S */
+ delete m_Thread;
+ }
+
+ /* mark the thread as dead so that future 'DisOwn's will delete
+ us */
+ m_Thread->m_ThreadAlive = false;
}
+
/*
Local Variables:
mode: c++
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|