[asycxx-devel] SF.net SVN: asycxx:[57] trunk
Status: Alpha
Brought to you by:
joe_steeve
|
From: <viv...@us...> - 2009-04-20 09:28:39
|
Revision: 57
http://asycxx.svn.sourceforge.net/asycxx/?rev=57&view=rev
Author: vivekanand83
Date: 2009-04-20 09:28:37 +0000 (Mon, 20 Apr 2009)
Log Message:
-----------
Added threading support.
Modified Paths:
--------------
trunk/configure.ac
trunk/examples/Makefile.am
trunk/examples/echoserver/Makefile.am
trunk/include/asycxx/Transport.h
trunk/src/Makefile.am
Added Paths:
-----------
trunk/examples/threadtest/
trunk/examples/threadtest/Makefile.am
trunk/examples/threadtest/Threadtest.cxx
trunk/include/asycxx/Thread.h
trunk/src/Thread.cxx
Modified: trunk/configure.ac
===================================================================
--- trunk/configure.ac 2009-04-16 13:24:59 UTC (rev 56)
+++ trunk/configure.ac 2009-04-20 09:28:37 UTC (rev 57)
@@ -71,6 +71,7 @@
examples/Makefile
examples/echoserver/Makefile
examples/canserver/Makefile
+examples/threadtest/Makefile
asycxx-0.1.pc
])
AC_OUTPUT
Modified: trunk/examples/Makefile.am
===================================================================
--- trunk/examples/Makefile.am 2009-04-16 13:24:59 UTC (rev 56)
+++ trunk/examples/Makefile.am 2009-04-20 09:28:37 UTC (rev 57)
@@ -1,2 +1,2 @@
-SUBDIRS = echoserver #canserver
+SUBDIRS = echoserver threadtest #canserver
Modified: trunk/examples/echoserver/Makefile.am
===================================================================
--- trunk/examples/echoserver/Makefile.am 2009-04-16 13:24:59 UTC (rev 56)
+++ trunk/examples/echoserver/Makefile.am 2009-04-20 09:28:37 UTC (rev 57)
@@ -4,7 +4,7 @@
EchoServer_SOURCES = EchoServer.cxx
EchoServer_LDADD = $(top_srcdir)/src/libasycxx-0.1.la
EchoServer_CPPFLAGS = -I$(top_srcdir)/include
-EchoServer_LDFLAGS = -L$(top_srcdir)/src
+EchoServer_LDFLAGS = -L$(top_srcdir)/src -lpthread
EchoServer_debug_SOURCES = $(EchoServer_SOURCES)
EchoServer_debug_CXXFLAGS = -g $(AM_CXXFLAGS)
Added: trunk/examples/threadtest/Makefile.am
===================================================================
--- trunk/examples/threadtest/Makefile.am (rev 0)
+++ trunk/examples/threadtest/Makefile.am 2009-04-20 09:28:37 UTC (rev 57)
@@ -0,0 +1,19 @@
+
+noinst_PROGRAMS = Threadtest Threadtest.debug
+
+Threadtest_SOURCES = Threadtest.cxx
+Threadtest_LDADD = $(top_srcdir)/src/libasycxx-0.1.la -lpthread
+Threadtest_CPPFLAGS = -I$(top_srcdir)/include
+Threadtest_LDFLAGS = -L$(top_srcdir)/src
+
+Threadtest_debug_SOURCES = $(Threadtest_SOURCES)
+Threadtest_debug_CXXFLAGS = -g $(AM_CXXFLAGS)
+Threadtest_debug_LDADD = $(top_srcdir)/src/.libs/libasycxx-dbg-0.1.a -lpthread
+Threadtest_debug_CPPFLAGS = -I$(top_srcdir)/include
+#Threadtest_debug_LDFLAGS = -L$(top_srcdir)/src
+
+# Local Variables:
+# mode: makefile
+# indent-tabs-mode: nil
+# tab-width: 4
+# End:
Added: trunk/examples/threadtest/Threadtest.cxx
===================================================================
--- trunk/examples/threadtest/Threadtest.cxx (rev 0)
+++ trunk/examples/threadtest/Threadtest.cxx 2009-04-20 09:28:37 UTC (rev 57)
@@ -0,0 +1,67 @@
+
+/********************************************************************
+ * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All
+ * rights reserved.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms described in the LICENSE file which accompanies
+ * this distribution. If the LICENSE file was not attached to this
+ * distribution or for further clarifications, please contact
+ * le...@hi....
+ *
+ *******************************************************************/
+
+#include <memory>
+
+#include <asycxx/Logger.h>
+#include <asycxx/Error.h>
+#include <asycxx/SelectReactor.h>
+#include <asycxx/StreamProtocol.h>
+#include <asycxx/StreamProtocolFactory.h>
+#include <asycxx/TCPListener.h>
+#include <asycxx/Thread.h>
+#include <asycxx/DataBuffer.h>
+
+using namespace asycxx;
+
+Logger *Log;
+
+static DataBuffer* threadFunction(void *objthread, DataBuffer *obj)
+{
+ fprintf(stderr,"Thread function called!\n");
+ return NULL;
+}
+
+static void CallbackFunc(void *obj, DataBuffer *param)
+{
+ fprintf(stderr,"CallBack function called.\n");
+}
+
+
+
+int main (void)
+{
+ Reactor *reactor;
+
+ Log = new Logger();
+ Log->EnableConsole();
+
+ reactor = new SelectReactor ();
+ Thread *thread;
+ thread = new Thread(reactor, &threadFunction,
+ NULL, NULL, &CallbackFunc,
+ NULL);
+ reactor->Run ();
+ printf ("Closing reactor");
+}
+
+/*
+ Local Variables:
+ mode: c++
+ indent-tabs-mode: nil
+ tab-width: 4
+ c-file-style: "gnu"
+ End:
+*/
+
+
Added: trunk/include/asycxx/Thread.h
===================================================================
--- trunk/include/asycxx/Thread.h (rev 0)
+++ trunk/include/asycxx/Thread.h 2009-04-20 09:28:37 UTC (rev 57)
@@ -0,0 +1,108 @@
+
+/********************************************************************
+ * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All
+ * rights reserved.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms described in the LICENSE file which accompanies
+ * this distribution. If the LICENSE file was not attached to this
+ * distribution or for further clarifications, please contact
+ * le...@hi....
+ *
+ *******************************************************************/
+
+#ifndef __HIPRO_ASYCXX__THREAD_H__
+#define __HIPRO_ASYCXX__THREAD_H__
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+
+#include "Error.h"
+#include "StreamTransport.h"
+#include "StreamProtocol.h"
+#include "StreamProtocolFactory.h"
+
+namespace asycxx
+{
+ typedef DataBuffer* (*ThreadFunction_t)(void* objThread, DataBuffer* obj);
+ typedef void (*CallbackFunction_t)(void* obj, DataBuffer *param);
+
+ class Thread
+ {
+ enum ThreadStatus
+ {
+ ThreadComplete = 0,
+ ThreadKilled
+ };
+
+ public:
+ Thread(Reactor *reactor, ThreadFunction_t thread_fn,
+ void *objThread, DataBuffer *data,
+ CallbackFunction_t cb_fn,
+ void *objCB);
+ pthread_t getThreadId(void) { return m_ThreadId; }
+
+ void SetKill(){ m_KillThread = true; }
+ bool IsKillThreadSet() { return m_KillThread; }
+ void Die();
+
+ private:
+ // PipeProtocol class
+ class PipeProtocol: public StreamProtocol
+ {
+ public:
+ PipeProtocol (Reactor *reactor,StreamProtocolFactory *factory,
+ StreamTransport *transport, Thread *pthis);
+
+ virtual ~PipeProtocol () {}
+
+ /**
+ * \brief get the current Transport
+ */
+ virtual StreamTransport* GetTransport (void) { return m_Transport; }
+ virtual void DataAvailable(DataBuffer *data);
+
+ private:
+ Thread *m_Thread;
+ StreamTransport * m_Transport;
+ };
+
+ //PipeProtocolFactory class
+ class PipeProtocolFactory:StreamProtocolFactory
+ {
+ public:
+ PipeProtocolFactory (Reactor *reactor, Thread *pthis) :
+ StreamProtocolFactory (reactor)
+ {
+ m_Thread = pthis;
+ }
+ ~PipeProtocolFactory () {}
+ PipeProtocol *GetProtocol (StreamTransport *trans)
+ {
+ return new PipeProtocol (m_Reactor, this, trans, m_Thread);
+ }
+
+ private:
+ Thread *m_Thread;
+ };
+
+ private:
+ ~Thread();
+ static void *ThreadFunction(void *param);
+ pthread_t m_ThreadId;
+ int m_PipeFd[2];
+ ThreadFunction_t m_ThreadFunc;
+ CallbackFunction_t m_ThreadCB;
+ void *m_ObjThread;
+ void *m_ObjCallBack;
+ DataBuffer *m_ObjData;
+ PipeProtocol *m_PipeProtocol;
+ DataBuffer *m_ThreadRetDB;
+ PipeProtocolFactory *m_PipeProtocolFactory;
+ StreamTransport *m_StreamTransport;
+ bool m_KillThread;
+ };
+}
+
+#endif //__HIPRO_ASYCXX__THREAD_H__
Modified: trunk/include/asycxx/Transport.h
===================================================================
--- trunk/include/asycxx/Transport.h 2009-04-16 13:24:59 UTC (rev 56)
+++ trunk/include/asycxx/Transport.h 2009-04-20 09:28:37 UTC (rev 57)
@@ -40,22 +40,22 @@
/**
* \brief Sets the transport ready for reading
*/
- void startReading (void);
+ virtual void startReading (void);
/**
* \brief Sets the transport to stop reading
*/
- void stopReading (void);
+ virtual void stopReading (void);
/**
* \brief Sets the transport ready for writing
*/
- void startWriting (void);
+ virtual void startWriting (void);
/**
* \brief Sets the transport to stop writing
*/
- void stopWriting (void);
+ virtual void stopWriting (void);
/**
* \brief Attach the given protocol to the Transport
Modified: trunk/src/Makefile.am
===================================================================
--- trunk/src/Makefile.am 2009-04-16 13:24:59 UTC (rev 56)
+++ trunk/src/Makefile.am 2009-04-20 09:28:37 UTC (rev 57)
@@ -22,7 +22,8 @@
GimpleMsgBus.cxx \
core.cxx \
RawCANTransport.cxx \
- RawCANListener.cxx
+ RawCANListener.cxx \
+ Thread.cxx
libasycxx_0_1_la_CPPFLAGS = -I$(top_srcdir)/include
Added: trunk/src/Thread.cxx
===================================================================
--- trunk/src/Thread.cxx (rev 0)
+++ trunk/src/Thread.cxx 2009-04-20 09:28:37 UTC (rev 57)
@@ -0,0 +1,160 @@
+/********************************************************************
+ * Copyright (C) 2008,2009 HiPro IT Solutions Pvt. Ltd., Chennai. All
+ * rights reserved.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms described in the LICENSE file which accompanies
+ * this distribution. If the LICENSE file was not attached to this
+ * distribution or for further clarifications, please contact
+ * le...@hi....
+ *
+ *******************************************************************/
+
+#ifdef HAVE_CONFIG_H
+#include <asycxx-config.h>
+#endif
+
+#include <errno.h>
+#include <unistd.h>
+#include <string.h>
+#include <deque>
+
+#include <sys/socket.h>
+#include <errno.h>
+#include <fcntl.h>
+
+
+#include "asycxx-common.h"
+#include <asycxx/Error.h>
+#include <asycxx/Protocol.h>
+#include <asycxx/Thread.h>
+
+using namespace asycxx;
+/**
+ *Ctor
+ */
+Thread::Thread(Reactor *reactor, ThreadFunction_t thread_fn,
+ void *objThread, DataBuffer *data,
+ CallbackFunction_t cb_fn,
+ void *objCB)
+{
+ int ret = pipe(m_PipeFd);
+ ASSERT((ret == 0), "Pipe creation for thread communication failed.");
+
+ SetFDAsNonBlocking(m_PipeFd[0]);
+ SetFDAsNonBlocking(m_PipeFd[1]);
+ m_PipeProtocolFactory = new PipeProtocolFactory(reactor, this);
+
+ /* create a transport */
+ m_StreamTransport = new StreamTransport (reactor, m_PipeFd[0]);
+ m_PipeProtocol = m_PipeProtocolFactory->GetProtocol(m_StreamTransport);
+ m_StreamTransport->setProtocol (m_PipeProtocol);
+
+ m_ThreadFunc = thread_fn;
+ m_ThreadCB = cb_fn;
+ m_ObjThread = objThread;
+ m_ObjCallBack = objCB;
+ m_ThreadRetDB = NULL;
+ m_KillThread = false;
+
+ if (data != NULL)
+ {
+ m_ObjData = data;
+ m_ObjData->Own();
+ }
+ else
+ m_ObjData = NULL;
+
+ ret = pthread_create(&m_ThreadId, NULL, ThreadFunction, (void *) this);
+ ASSERT((ret == 0), "Thread creation failed")
+}
+
+/**
+ * Dtor
+ */
+Thread::~Thread()
+{
+ m_StreamTransport->closeReader ();
+ close(m_PipeFd[1]);
+
+ delete m_PipeProtocol;
+ delete m_PipeProtocolFactory;
+ delete m_StreamTransport;
+}
+
+/**
+ *\brief: Thread function to start in a thread.
+ * This function will call the threadFunction which has to do the main
+ * job. On successful completion of the main thread function or failure,
+ * it will write the status to the pipe and quit.
+ */
+void*
+Thread::ThreadFunction(void *param)
+{
+ Thread *pThis = (Thread *)param;
+ int ret = -1;
+ pThis->m_ThreadRetDB = pThis->m_ThreadFunc(pThis->m_ObjThread,
+ pThis->m_ObjData);
+
+ int status = ThreadComplete;
+
+ while (ret <= 0)
+ ret = write(pThis->m_PipeFd[1], &status, sizeof(ThreadStatus));
+ pthread_exit(0);
+}
+
+/**
+ *\brief Ctor for PipeProtocol
+ */
+Thread::PipeProtocol::PipeProtocol (Reactor *reactor,StreamProtocolFactory *factory,
+ StreamTransport *transport, Thread *pthis):
+ StreamProtocol (reactor, (StreamProtocolFactory *)factory, transport)
+{
+ m_Thread = pthis;
+ m_Transport = transport;
+ m_Transport->startReading();
+}
+
+/**
+ *\brief Called by protocol when data is available on the pipe
+ */
+void Thread::PipeProtocol::DataAvailable(DataBuffer *data)
+{
+ LOG("%s", __FUNCTION__);
+ ASSERT ((data != NULL), "DataBuffer is empty");
+ pthread_t threadId = m_Thread->getThreadId();
+ pthread_join(threadId, NULL);
+ if (m_Thread->m_ObjData != NULL)
+ m_Thread->m_ObjData->DisOwn();
+
+ if (m_Thread->IsKillThreadSet() == true)
+ {
+ if (m_Thread->m_ThreadRetDB != NULL)
+ m_Thread->m_ThreadRetDB->DisOwn();
+ m_Thread->Die();
+ return;
+ }
+
+ Thread::ThreadStatus *status = (ThreadStatus*)data->Data();
+ if ((*status == Thread::ThreadComplete))
+ m_Thread->m_ThreadCB(m_Thread->m_ObjCallBack, m_Thread->m_ThreadRetDB);
+ if (m_Thread->m_ThreadRetDB != NULL)
+ m_Thread->m_ThreadRetDB->DisOwn();
+}
+
+/**
+ *\brief after completion of the task call this to delete the thread object
+ */
+void Thread::Die()
+{
+ delete this;
+}
+
+/*
+ Local Variables:
+ mode: c++
+ indent-tabs-mode: nil
+ tab-width: 4
+ c-file-style: "gnu"
+ End:
+*/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|