C++17 In Detail

02 December 2019

Threading in C++17: Loopers & Dispatchers

Threading and Loopers, C++17

Multithreading is a tough nut in software development. Not just because there are dozens of ways to approach a single problem, but also since one can get so many things wrong.

In this article, I want to present how to realize the concept of a Looper with Dispatchers in C++17.

This article is a guest post by Marc-Anton Boehm-von Thenen:

Marc (a.k.a DottiDeveloper) is a freelance software engineer primarily working with C++, C# as well as a lecturer in the games and graphics programming environment.
He has been developing his spare-time, playground project “ShirabeEngine” for more than 2.5 years, which made C++ the primary aspect of his everyday job and passion.
But wait, there’s more: He is a passionate dad, loves music, drawing and painting as well as creating analogue learning games for school children (more on this publicly in march 2020).
Check out his github, XING, LinkedIN and twitter profiles and his blog at http://craft-deploy.it/

Introduction

Many widely used systems are based on this paradigm, despite their various expressions of the individual use case based on it.

Just to name a few:

AndroidOS - Loopers as a message queue and one or more Handler types, depending on the specific message.
(See: https://developer.android.com/reference/android/os/Looper )

Qt Framework - Also as message queue upon which the signal and slot mechanism is built to signal across thread boundaries.
(See: https://doc.qt.io/qt-5/signalsandslots.html and https://woboq.com/blog/how-qt-signals-slots-work.html )

**Windowing system**s with a UI-Thread and event-callbacks.

Most Game-Loops in game engines (even though they might not be reusable components), which attach to the main thread and hook into operating system specific event systems - the classic WINAPI-hooks (admit it, you know them ;) )

Let’s examine the concept.

The problem: Executing long(er) running tasks on worker threads

Usually, it is no problem to start a thread and execute a function on it, e.g. using C++11’s <thread> and std::thread:

#include <thread>
#include <iostream>
#include <stdint.h>

void work(uint32_t const &aNumberOfIterations)
{
    for(uint32_t k=0; k<aNumberOfIterations; ++k)
    {
        std::cout << "I am a manifestation of an iteration\n";
    }

    // Thread will terminate here.
}

// ...
std::thread worker(work); // ATTENTION: Will start immediately!
worker.join(); // Block and wait for completion 
// ...

So, why not use it everywhere and be happy?

Well, threads are not for free.

There will at least be a stack allocated for the thread. There is the management of all threads to be done with respect to the governing process in kernel space and the operating system implementation. Also, when having a large number of threads, scaleability, will almost certainly become a critical factor, regarding the huge amount of permutations of target systems.

And even worse, the specific expression of a thread is dependent on the operation system and the threading library used.

See:
https://eli.thegreenplace.net/2016/c11-threads-affinity-and-hyperthreading/

Finally, we hardly have any control about the threads and its execution.

  • Are things executed in proper order?
  • Who maintains the threads?
  • How to receive results from asynchronous execution?
  • What about task priorities or delayed insertions?
  • Maybe even event-driven dispatching?

As long as we don’t have co_routines and executors, let’s look at another way to approach thread reusability and controlled threading.

May I introduce: Loopers

Loopers, in its core, are objects, which contain or are attached to a thread with a conditional infinite loop, which runs as long as the abort-criteria is unmet. Within this loop, arbitrary actions can be performed.
Usually, a methodology like start, run and stop are provided.

Let’s derive an example class in three steps.

Wrapping a thread
First things first, we define the CLooper-class, which contains an std::thread-member and a run-method, which will create the thread, invoking runFunc - our second method - implementing the effective thread operation.

#include <thread>
#include <atomic>
#include <memory>
#include <functional>
#include <stdexcept>

class CLooper
{
public:
    CLooper() 
    { }
    // Copy denied, Move to be implemented

    ~CLooper()
    {
    }

    // To be called, once the looper should start looping.
    bool run()
    {
        try 
        {
            mThread = std::thread(&CLooper::runFunc, this);
        }
        catch(...) 
        {
            return false;
        }

        return true;
    }

private:
    void runFunc() 
    {
        // Thread function
    }

private:
    std::thread mThread;
};

Running the infinite loop
Then, we add the infinite loop to the looper implementation as well as an atomic flag mRunning and a corresponding getter running() indicating whether the looper is running or not.

public: // Methods
    bool running() const 
    {
        return mRunning.load();
    }

private: // Methods
    // Conditionally-infinite loop doing sth. iteratively
    void runFunc() 
    {
        mRunning.store(true);

        while(true)
        {
            try
            {
                // Do something...
            }
            catch(std::runtime_error& e) 
            {
                // Some more specific
            }
            catch(...) 
            {
                // Make sure that nothing leaves the thread for now...
            }
        }

        mRunning.store(false);
    }

private: // Members
    std::atomic_bool mRunning;

Stopping the looper cleanly
In order to stop the looper, however, we need some more methodology.

We add an abort-criteria to the infinite loop - mAbortRequested - of type std::atomic<bool>, which is checked against in each iteration.

We also add a private method abortAndJoin(), which will set the mAbortRequested-flag to true, invoke join() on the thread and waits until the looper-function has been exited and the worker thread was joined. The destructor will also invoke abortAndJoin() in case the looper goes out of scope.

The public method stop() serves as a public API handle to control the looper.

public: // Ctor/Dtor
    ~CLooper()
    {
        abortAndJoin();
    }

public: // Methods
    void stop()
    {
        abortAndJoin();
    }

private: // Methods
    // Conditionally-infinite loop doing sth. iteratively
    void runFunc() 
    {
        mRunning.store(true);

        // We now check against abort criteria
        while(false == mAbortRequested.load())
        {
            try
            {
                // Do something...
            }
            catch(std::runtime_error& e) 
            {
                // Some more specific
            }
            catch(...) 
            {
                // Make sure that nothing leaves the thread for now...
            }
        }

        mRunning.store(false);
    }

    // Shared implementation of exiting the loop-function and joining 
    // to the main thread.
    void abortAndJoin()
    {
        mAbortRequested.store(true);
        if(mThread.joinable())
        {
            mThread.join();
        }
    }

private: // Members
    std::atomic_bool mAbortRequested;

This basic construct can now be used as follows:

auto looper = std::make_unique<CLooper>();

std::cout << "Starting looper" << std::endl;
// To start and run
looper->run();

std::this_thread::sleep_for(std::chrono::seconds(5));

std::cout << "Stopping looper" << std::endl;
// To stop it and clean it up
looper->stop();
looper = nullptr;

Filling it with life: Tasks

The above example implementation, however, is an iterative no-op, it doesn’t do anything.

Let’s base upon it and fill it with life by permitting the looper to execute something.

In the context of loopers, these are little executable portions of code sharing a common signature, i.e. one or more **Task**s, which can be fetched from an internal collection, e.g. a FIFO-queue, and be executed on the worker thread.

Let’s start with the definition of a task type by adding this to the CLooper-class:
using Runnable = std::function<void()>;

Next, permit the looper to hold runnables by adding

std::recursive_mutex mRunnablesMutex;
std::queue<Runnable> mRunnables;

to the list of members.
The mutex is required to guard against simultaneous access to the task collection by the worker and dispatching thread.

In order to access the Runnables, in case the queue is not empty, add the below function.

Runnable next()
{
    std::lock_guard guard(mRunnablesMutex); // CTAD, C++17

    if(mRunnables.empty())
    {
        return nullptr;
    }

    Runnable runnable = mRunnables.front();
    mRunnables.pop();

    return runnable;
}

And finally, in order to have the runnables be executed, add the below snippet into runFunc’s try-block.

using namespace std::chrono_literals;
Runnable r = next();
if(nullptr != r)
{
    r();
}
else
{
    std::this_thread::sleep_for(1ms);
}

If there were any means of adding tasks yet, the looper would happily process the tasks pushed to the vector now.

Accepting work: Dispatchers

The looper still is useless, since no tasks can be pushed to the queue.

The final concept to solve this is the Dispatcher.

Imagine the dispatcher to be a bouncer in front of the looper.
It will accept a task but will manage insertion into the working-queue.
This way, some fancy usage scenarios can be enabled, e.g. delayed execution or immediate posting.

In this blog-post, however, I will elaborate regular FIFO-insertion only.

Let’s describe the dispatcher-class briefly, as a nested class in CLooper BELOW the alias Runnable.

public:
    using Runnable = std::function<void()>;

    class CDispatcher
    {
        friend class CLooper; // Allow the looper to access the private constructor.

    public: 
       // Yet to be defined method, which will post the runnable 
       // into the looper-queue.
       bool post(CLooper::Runnable &&aOther);

    private: // construction, since we want the looper to expose it's dispatcher exclusively!
        CDispatcher(CLooper &aLooper)
            : mAssignedLooper(aLooper)
       {}

    private:
       // Store a reference to the attached looper in order to 
       // emplace tasks into the queue.
       CLooper &mAssignedLooper;
    };

With this definition given, we add a std::shared_ptr<CDispatcher> mDispatcher; in CLooper and add mDispatcher(std::shared_ptr<CDispatcher>(new CDispatcher(*this))) to the constructor’s initialization-list.

Remark:
The std::shared_ptr<T>-constructor is required over std::make_shared, since the constructor of CDispatcher is private and inaccessible from std::make_shared.

Next, add the below method into the CLooper-class, in order to retrieve the dispatcher:

std::shared_ptr<CDispatcher> getDispatcher()
{
   return mDispatcher;
}

Next, let’s implement the CDispatcher’s post-method as below:

bool post(CLooper::Runnable &&aRunnable)
{
   return mAssignedLooper.post(std::move(aRunnable));
}

And finally, add this private method to CLooper.

private:
    bool post(Runnable &&aRunnable)
    {
        if(not running())
        {
            // Deny insertion
            return false;
        }

        try
        {
            std::lock_guard guard(mRunnablesMutex); // CTAD, C++17

            mRunnables.push(std::move(aRunnable));
        }
        catch(...) {
            return false;
        }

        return true;
    }

The whole construct can be used as follows now:

auto looper = std::make_unique<CLooper>();

std::cout << "Starting looper" << std::endl;
// To start and run
looper->run();

auto dispatcher = looper->getDispatcher();

std::cout << "Adding tasks" << std::endl;
for(uint32_t k=0; k<500; ++k)
{
    auto const task = [k]()
    { 
        std::cout << "Invocation " << k 
                  << ": Hello, I have been executed asynchronously on the looper for " << (k + 1) 
                  << " times." << std::endl;
    };

    dispatcher->post(std::move(task));
}

std::cout << "Waiting 5 seconds for completion" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));

std::cout << "Stopping looper" << std::endl;
// To stop it and clean it up
dispatcher = nullptr;
looper->stop();
looper = nullptr;

Working example: @Wandbox

Where to continue from here?

This example code can be improved in a lot of places and is far from perfect and I would say not necessarily even safe.

  • It can be extended using <future> and it’s std::future and std::promise features to execute asynchronously and receive a result.
  • The dispatcher can be extended to permit priority execution (immediate execution) and delayed execution.
  • The entire looper can be made lock-free.
  • We could attach a messaging system upon the looper.
  • We could support handlers and different handler-types for dispatched messages, i.e. functors, which are automatically invoked based on some identifying criteria in the message or being provided by the dispatcher.

There are many things we can do, which we will maybe describe in follow-up posts.

Conclusion

This construct is a good starting point to regain control of your threading and reuse threads while simultaneously reducing threading-overhead.

The design is simple and comprehensible and permits thread-safe dispatching of work-items to a single worker-thread while reducing the spread of thread-dispatches throughout the codebase.

It has its limitations, though!

The looper is just a control-construct attached to a single worker-thread and can not handle parallelized execution or workload balancing, which Thread-Pools with work-stealing are perfect for.

But, if there’s a single worker thread required for a specific type of tasks, the Looper can be a more simple and more comprehensible approach to solve the multithreading issue!

C++17 In Detail
© 2017, Bartlomiej Filipek, Blogger platform
Disclaimer: Any opinions expressed herein are in no way representative of those of my employers. All data and information provided on this site is for informational purposes only. I try to write complete and accurate articles, but the web-site will not be liable for any errors, omissions, or delays in this information or any losses, injuries, or damages arising from its display or use.
This site contains ads or referral links, which provide me with a commission. Thank you for your understanding.