#include <cstdlib>
#include <iostream>
#include <memory>
#include "posix_threads_thread.hpp"
#include "posix_process.hpp"
void* asyncFunction(void* param)
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
}
#define PROCESS 0
#define THREAD 1
//#define MODE PROCESS
#define MODE THREAD
int
main(int argc, char** argv)
{
std::unique_ptr<posixcpp::ActiveObject> activeObject;
#if MODE == PROCESS
activeObject.reset(new posix::Process(asyncFunction));
#else
activeObject.reset(new posix::threads::Thread(asyncFunction));
#endif
activeObject->start();
activeObject->waitFinish();
std::cout << "Complete" << std::endl;
return EXIT_SUCCESS;
}
#include <cstdlib>
#include <iostream>
#include <queue>
#include "posix_threads_thread.hpp"
#include "posix_threads_mutex.hpp"
#include "posix_threads_condition.hpp"
#include "posix_threads_criticalsection.hpp"
template <typename T>
class BoundedSyncBlockingQueue
{
public:
explicit BoundedSyncBlockingQueue(size_t capaciy) :
m_fullCond(m_lock),
m_emptyCond(m_lock),
m_capacity(capaciy)
{
}
void enqueue(const T& item)
{
posix::threads::CriticalSection cs(m_lock);
while (m_baseQueue.size() >= m_capacity)
m_fullCond.wait();
m_baseQueue.push(item);
if (m_baseQueue.size() == 1)
m_emptyCond.broadcast();
}
T dequeue()
{
posix::threads::CriticalSection cs(m_lock);
while (m_baseQueue.empty())
m_emptyCond.wait();
T result = m_baseQueue.front();
m_baseQueue.pop();
if (m_baseQueue.size() == (m_capacity - 1))
m_fullCond.broadcast();
return result;
}
size_t getSize() const
{
posix::threads::CriticalSection cs(m_lock);
return m_baseQueue.size();
}
private :
mutable posix::threads::Mutex m_lock;
posix::threads::Condition m_fullCond;
posix::threads::Condition m_emptyCond;
const size_t m_capacity;
std::queue<T> m_baseQueue;
};
static posix::threads::Mutex stdoutLock;
class Reader : public posix::threads::Thread
{
public:
explicit Reader(BoundedSyncBlockingQueue<int>& q) :
m_queue(q)
{
}
private:
BoundedSyncBlockingQueue<int>& m_queue;
// posixcpp::ActiveObjectInterface
public:
void run() override
{
for (uint8_t i=0; i<5; ++i) {
int a = m_queue.dequeue();
stdoutLock.lock();
std::cout << "Get " << int(a) << std::endl;
stdoutLock.unlock();
}
}
};
class Writer : public posix::threads::Thread
{
public:
explicit Writer(BoundedSyncBlockingQueue<int>& q) :
m_queue(q)
{
}
private:
BoundedSyncBlockingQueue<int>& m_queue;
// posixcpp::ActiveObjectInterface
public:
void run() override
{
for (uint8_t i=0; i<10; ++i) {
posix::time::sleep(1);
m_queue.enqueue(int(i));
stdoutLock.lock();
std::cout << "Put " << int(i) << std::endl;
stdoutLock.unlock();
}
}
};
int
main(int argc, char** argv)
{
BoundedSyncBlockingQueue<int> queue(4);
Reader r(queue);
Writer w(queue);
r.start();
w.start();
posix::time::sleep(11);
try {
r.cancel();
} catch (const std::exception& ex) {
std::cerr << ex.what() << std::endl;
}
try {
w.cancel();
} catch (const std::exception& ex) {
std::cerr << ex.what() << std::endl;
}
try {
r.join();
} catch (const std::exception& ex) {
std::cerr << ex.what() << std::endl;
}
try {
w.join();
} catch (const std::exception& ex) {
std::cerr << ex.what() << std::endl;
}
return EXIT_SUCCESS;
}
The output may look like
Put 0
Get 0
Put 1
Get 1
Put 2
Get 2
Put 3
Get 3
Put 4
Get 4
Put 5
Put 6
Put 7
Put 8
Call condition (errno = pthread_cancel(m_handle)) == 0 at ../src/posix_threads_thread.cpp:134 Unable to cancel thread: No such process
The Reader thread has no delay in it's loop, but blocks on the queue empty condition every time it gets the last element of the queue.
The writer thread blocks after inserting 8th element, because reader takes only 5 elements (0-4) and the queue has only 4 elements capacity.