Menu

Code examples

Andrey Skvortsov

1. Asynchronous function call

#include <cstdlib>

#include <iostream>
#include <memory>

#include "posix_threads_thread.hpp"
#include "posix_process.hpp"

void* asyncFunction(void* param)
{
    std::cout << __PRETTY_FUNCTION__ << std::endl;
}

#define PROCESS 0
#define THREAD 1

//#define MODE PROCESS
#define MODE THREAD

int
main(int argc, char** argv)
{
    std::unique_ptr<posixcpp::ActiveObject> activeObject;
#if MODE == PROCESS
    activeObject.reset(new posix::Process(asyncFunction));
#else
    activeObject.reset(new posix::threads::Thread(asyncFunction));
#endif

    activeObject->start();
    activeObject->waitFinish();

    std::cout << "Complete" << std::endl;

    return EXIT_SUCCESS;
}

2. Synchronization primitives

2.1 Bounded synchronous queue

#include <cstdlib>

#include <iostream>
#include <queue>

#include "posix_threads_thread.hpp"
#include "posix_threads_mutex.hpp"
#include "posix_threads_condition.hpp"
#include "posix_threads_criticalsection.hpp"

template <typename T>
class BoundedSyncBlockingQueue
{
public:

    explicit BoundedSyncBlockingQueue(size_t capaciy) :
    m_fullCond(m_lock),
    m_emptyCond(m_lock),
    m_capacity(capaciy)
    {
    }

    void enqueue(const T& item)
    {
        posix::threads::CriticalSection cs(m_lock);

        while (m_baseQueue.size() >= m_capacity)
            m_fullCond.wait();

        m_baseQueue.push(item);

        if (m_baseQueue.size() == 1)
            m_emptyCond.broadcast();
    }

    T dequeue()
    {
        posix::threads::CriticalSection cs(m_lock);

        while (m_baseQueue.empty())
            m_emptyCond.wait();

        T result = m_baseQueue.front();
        m_baseQueue.pop();

        if (m_baseQueue.size() == (m_capacity - 1))
            m_fullCond.broadcast();

        return result;
    }

    size_t getSize() const
    {
        posix::threads::CriticalSection cs(m_lock);

        return m_baseQueue.size();
    }

private :

    mutable posix::threads::Mutex m_lock;

    posix::threads::Condition m_fullCond;

    posix::threads::Condition m_emptyCond;

    const size_t m_capacity;

    std::queue<T> m_baseQueue;
};

static posix::threads::Mutex stdoutLock;

class Reader : public posix::threads::Thread
{
public:

    explicit Reader(BoundedSyncBlockingQueue<int>& q) :
    m_queue(q)
    {
    }

private:

    BoundedSyncBlockingQueue<int>& m_queue;

// posixcpp::ActiveObjectInterface
public:

    void run() override
    {
        for (uint8_t i=0; i<5; ++i) {
            int a = m_queue.dequeue();
            stdoutLock.lock();
            std::cout << "Get " << int(a) << std::endl;
            stdoutLock.unlock();
        }
    }
};

class Writer : public posix::threads::Thread
{
public:

    explicit Writer(BoundedSyncBlockingQueue<int>& q) :
    m_queue(q)
    {
    }

private:

    BoundedSyncBlockingQueue<int>& m_queue;

// posixcpp::ActiveObjectInterface
public:

    void run() override
    {
        for (uint8_t i=0; i<10; ++i) {
            posix::time::sleep(1);
            m_queue.enqueue(int(i));
            stdoutLock.lock();
            std::cout << "Put " << int(i) << std::endl;
            stdoutLock.unlock();
        }
    }
};

int
main(int argc, char** argv)
{
    BoundedSyncBlockingQueue<int> queue(4);

    Reader r(queue);
    Writer w(queue);

    r.start();
    w.start();

    posix::time::sleep(11);

    try {
        r.cancel();
    } catch (const std::exception& ex) {
        std::cerr << ex.what() << std::endl;
    }

    try {
        w.cancel();
    } catch (const std::exception& ex) {
        std::cerr << ex.what() << std::endl;
    }

    try {
        r.join();
    } catch (const std::exception& ex) {
        std::cerr << ex.what() << std::endl;
    }

    try {
        w.join();
    } catch (const std::exception& ex) {
        std::cerr << ex.what() << std::endl;
    }

    return EXIT_SUCCESS;
}

The output may look like

Put 0
Get 0
Put 1
Get 1
Put 2
Get 2
Put 3
Get 3
Put 4
Get 4
Put 5
Put 6
Put 7
Put 8
Call condition (errno = pthread_cancel(m_handle)) == 0 at ../src/posix_threads_thread.cpp:134 Unable to cancel thread: No such process

The Reader thread has no delay in it's loop, but blocks on the queue empty condition every time it gets the last element of the queue.
The writer thread blocks after inserting 8th element, because reader takes only 5 elements (0-4) and the queue has only 4 elements capacity.