From: <sv...@ww...> - 2004-06-23 16:47:59
|
Author: mkrose Date: 2004-06-23 09:47:53 -0700 (Wed, 23 Jun 2004) New Revision: 1065 Modified: trunk/CSP/SimData/CHANGES.current trunk/CSP/SimData/Include/SimData/Thread.h trunk/CSP/SimData/Include/SimData/ThreadBase.h trunk/CSP/SimData/Include/SimData/ThreadQueue.h Log: More thread cleanup: timed join, simple thread abort mechanism, accessor for the task associated with a thread, fix for a compile error and slightly improved interfaces in the queue classes. Browse at: https://www.zerobar.net/viewcvs/viewcvs.cgi?view=rev&rev=1065 Modified: trunk/CSP/SimData/CHANGES.current =================================================================== --- trunk/CSP/SimData/CHANGES.current 2004-06-23 06:35:25 UTC (rev 1064) +++ trunk/CSP/SimData/CHANGES.current 2004-06-23 16:47:53 UTC (rev 1065) @@ -6,6 +6,10 @@ * Cleaned doxygen comments for some thread classes. + * More thread cleanup: timed join, simple thread abort mechanism, + accessor for the task associated with a thread, compile fixes + and improved interface in queue classes. + 2004-06-20: onsight * Doxygen and copyright notice updates. Modified: trunk/CSP/SimData/Include/SimData/Thread.h =================================================================== --- trunk/CSP/SimData/Include/SimData/Thread.h 2004-06-23 06:35:25 UTC (rev 1064) +++ trunk/CSP/SimData/Include/SimData/Thread.h 2004-06-23 16:47:53 UTC (rev 1065) @@ -108,13 +108,13 @@ /** Suspend execution of the current thread the specified amount of time. * - * @param seconds seconds to pause - * @param nanosecondns nanoseconds to pause (in addition to seconds) + * @param seconds seconds to pause (fractional values ok, but limited by + * the granularity of the operating system's time slices). */ - static void sleep(int seconds, int nanoseconds=0) { + static void sleep(double seconds) { static ThreadCondition m_condition; m_condition.lock(); - m_condition.wait(seconds + 1e-9 * nanoseconds); + m_condition.wait(seconds); m_condition.unlock(); } @@ -145,6 +145,19 @@ */ virtual void run()=0; + /** Signal a task to abort. + * + * This is advisory only. Tasks can test for this signal by calling + * isAborted(), then ignore or honor the aborted condition in any manner. + */ + virtual void abort() { m_abort = true; } + + /** Test if a request to abort the task has been made. + * + * @return true if abort() has been called, false otherwise. + */ + bool isAborted() const { return m_abort; } + /** Returns true if this task is executing in a detached thread. Memory * resources associated with a detached thread are automatically reclaimed * when the thread exits, but other threads cannot join a detached thread. @@ -166,7 +179,7 @@ /** Initialize the task state. */ - Task(): m_running(false), m_detached(false), m_complete(false), m_thread_id(0) { + Task(): m_running(false), m_detached(false), m_complete(false), m_abort(false), m_thread_id(0) { } virtual ~Task() { } @@ -198,12 +211,12 @@ /** Begin execution of the task. */ void _execute() { - std::cout << getName() << " start\n"; m_running = true; m_thread_id = pthread_self(); run(); - std::cout << getName() << " exit\n"; m_complete = true; + // signal any threads waiting to join with us + m_exit.signalAll(); } /** Mark this task as running in a detached thread. @@ -214,11 +227,24 @@ */ void setName(std::string const &name) { m_name = name; } + /** Join this task's thread by for an internal condition variable that + * signals on exit. Used via ThreadBase. + * + * @param timeout the maximum time in seconds to wait for the task to finish. + * @return true if the task ended, false if the timeout expired. + */ + bool join(double timeout) { + ScopedLock<ThreadCondition> lock(m_exit); + return m_exit.wait(timeout); + } + std::string m_name; bool m_running; bool m_detached; bool m_complete; + bool m_abort; ThreadId m_thread_id; + ThreadCondition m_exit; }; @@ -229,22 +255,33 @@ */ class BaseThread: public NonCopyable { + // private and undefined, use BaseThread(Task*, std::string const &) BaseThread(); protected: - BaseThread(Task *task, std::string const &name): m_task(task), m_thread_id(0) { + /** Construct a new thread with a bound task. See Thread<> for details. + * + * @param task a non-null Task pointer to bind to this thread. + * @param name a unique string identifier for this thread. + */ + BaseThread(Task *task, std::string const &name): m_task(task), m_thread_id(0), m_cancelled(false) { assert(task); task->setName(name); } + /** Accessor used by Thread<> to provide type-specialized access to the + * associated task instance. + */ + Ref<Task> const &getTask() const { return m_task; } + public: + /** Destroy a thread instance and free internal resources. * * The thread will be cancelled if it is running and not detached. */ virtual ~BaseThread() { - std::cout << "~BaseThread " << m_task->getName() << "\n"; if (isActive() && !isDetached()) { cancel(); } @@ -259,19 +296,18 @@ ThreadException::check(result); } - /* - bool join(int sec, int ns) { - pthread_joinoption_np_t joinoption; - memset(&joinoption, 0, sizeof(pthread_joinoption_np_t)); - joinoption.deltatime.tv_sec = sec; - joinoption.deltatime.tv_ns = ns; - joinoption.leaveThreadAllocated = 1; - int result = pthread_extendedjoin_np(m_thread_id, 0, &joinoption); - if (result == 0) return true; - if (result == ETIMEDOUT) return false; - throw ThreadException(result); + /** Wait for at most a limited amount of time for this thread to finish. + * + * @param timeout the maximum time in seconds to wait for the task to finish. + * @return true if the task ended, false if the timeout expired. + */ + bool join(double timeout) { + assert(!isDetached()); + if (isActive()) { + return m_task->join(timeout); + } + return false; } - */ /** Detach the thread. Once detached, the resources of the thread will * be reclaimed automatically when the thread exits. Note that you @@ -294,11 +330,22 @@ * cancellation checkpoint (e.g. by calling testCancel). */ void cancel() { + if (m_cancelled) return; assert(isStarted()); const int result = pthread_cancel(m_thread_id); ThreadException::check(result); + m_cancelled = true; + //m_thread_id = 0; } + /** Signal the associated task to abort. This is an advisory request; + * the task is free to ignore or honor it in any manner. Provides a + * gentler alternative to cancel(). + */ + void abort() { + if (isStarted()) m_task->abort(); + } + /** Start the thread running. Calls run() in the new thread and returns * immediately. */ @@ -336,6 +383,7 @@ private: Task::ThreadId m_thread_id; Ref<Task> m_task; + bool m_cancelled; }; @@ -389,6 +437,13 @@ */ Thread(std::string const &name): BaseThread(new TASK(), name) { } + /** Get the associated task instance. + * + * Requires an internal downcast from Task to the template type, so + * store a local copy if this overhead is a concern. + */ + Ref<TASK> getTask() const { return BaseThread::getTask(); } + }; NAMESPACE_SIMDATA_END Modified: trunk/CSP/SimData/Include/SimData/ThreadBase.h =================================================================== --- trunk/CSP/SimData/Include/SimData/ThreadBase.h 2004-06-23 06:35:25 UTC (rev 1064) +++ trunk/CSP/SimData/Include/SimData/ThreadBase.h 2004-06-23 16:47:53 UTC (rev 1065) @@ -72,6 +72,9 @@ if (result != 0) throw ThreadException(result); } + int getError() const { return m_error; } + std::string getMessage() const { return m_message; } + private: void translate(const int error) { @@ -87,7 +90,7 @@ case EPERM: m_error = DENIED; default: - m_error = INTERNAL; + m_error = error + 1000; //INTERNAL; } } @@ -296,7 +299,7 @@ * signals the condition variable, one (or all) of the threads waiting * on that variable will be activated. */ -class ThreadCondition { +class ThreadCondition: public NonCopyable { public: /** Construct a condition variable, using an internal mutex. @@ -403,8 +406,6 @@ pthread_cond_t m_cond; pthread_mutex_t m_local_mutex; pthread_mutex_t &m_mutex; - ThreadCondition(const ThreadCondition &); - ThreadCondition &operator=(const ThreadCondition &); }; Modified: trunk/CSP/SimData/Include/SimData/ThreadQueue.h =================================================================== --- trunk/CSP/SimData/Include/SimData/ThreadQueue.h 2004-06-23 06:35:25 UTC (rev 1064) +++ trunk/CSP/SimData/Include/SimData/ThreadQueue.h 2004-06-23 16:47:53 UTC (rev 1065) @@ -63,20 +63,33 @@ m_notFull.unlock(); m_notEmpty.signal(); } - TYPE get(bool block = true) { + bool get(TYPE &item, bool block = true) { m_notEmpty.lock(); while (empty()) { if (!block) { m_notEmpty.unlock(); - throw Empty(); + return false; } m_notEmpty.wait(); } - TYPE item = popItem(); + item = popItem(); m_notEmpty.unlock(); m_notFull.signal(); - return item; + return true; } + bool get(TYPE &item, double timeout) { + m_notEmpty.lock(); + while (empty()) { + if (!m_notEmpty.wait(timeout)) { + m_notEmpty.unlock(); + return false; + } + } + item = popItem(); + m_notEmpty.unlock(); + m_notFull.signal(); + return true; + } protected: ThreadSafeQueueBase(int maxsize) : m_mutex(), m_notEmpty(m_mutex), m_notFull(m_mutex), m_maxsize(maxsize) {} @@ -103,7 +116,8 @@ m_queue.push(item); } virtual TYPE popItem() { - TYPE item = m_queue.top(); + assert(size() > 0); + TYPE item = m_queue.front(); m_queue.pop(); return item; } @@ -124,7 +138,8 @@ m_queue.push(item); } virtual TYPE popItem() { - TYPE item = m_queue.top(); + assert(size() > 0); + TYPE item = m_queue.front(); m_queue.pop(); return item; } |