C++17 In Detail

01 June 2020

C++ Lambdas, Threads, std::async and Parallel Algorithms

C++ Lambdas and multithreading, async

In articles about lambda expression (like this one from last week on my page), it’s easy to show examples where the lambda runs on the same thread as the caller. But how about asynchronous cases? What if your lambda is called on a separate thread? What problems you might encounter there.

Read on and let’s explore this topic.

Lambdas with std::thread

Let’s start with std::thread. As you might already know std::thread accepts a callable object in its constructor. It might be a regular function pointer, a functor or a lambda expression. A simple example:

std::vector<int> numbers(100);

std::thread iotaThread([&numbers](int startArg) {
    std::iota(numbers.begin(), numbers.end(), startArg);
    std::cout << "from: " << std::this_thread::get_id() << " thread id\n";
    }, 10
);

iotaThread.join();
std::cout << "numbers in main (id " << std::this_thread::get_id() << "):\n";
for (auto& num : numbers)
    std::cout << num << ", ";

In the above sample, we create a single thread with a lambda expression. The std::thread class has a flexible constructor, thus we can even pass a value for the argument. In our code 10 is passed into the lambda as startArg.

The code is simple because we can control the thread execution, and by joining it, we know that the results of the iota will be ready before we print them.

The important thing is to remember that while lambdas make it easy and convenient to create a thread, we still have the asynchronous execution. All the issues that you might get with passing a regular function are the same here.

This is visible in the following example:

int counter = 0;

std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
    threads.push_back(std::thread([&counter]() {
        for (int i = 0; i < 100; ++i) {
            ++counter;
            --counter;
            ++counter;
        }
        }));
}

for (auto& thread : threads) {
    thread.join();
}

std::cout << counter << std::endl;

We’re creating five threads, and each thread performs super-advanced computations on the counter variable which is shared among all the threads.

While you might expect to see 500 as the final value of counter the result is undefined. During my testing on Visual Studio 2019, I got the following output (running it several times):

500
400
403
447

To fix the issue, as with regular threading scenarios, we should use some sort of a synchronisation mechanism. For this example, we can use atomics as they seem to be easiest to use and fastest.

std::atomic<int> counter = 0;

std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
    threads.push_back(std::thread([&counter]() {
        for (int i = 0; i < 100; ++i) {
            counter.fetch_add(1);
        }
    }));
}

for (auto& thread : threads) {
    thread.join();
}

std::cout << counter.load() << std::endl;

The code above works as expected because the increment operation is now atomic. It means that counter will be incremented and other threads won’t interrupt this action. Without any form of synchronisation threads might read the current value of counter at the same time and then increment it, causing the final value to be undefined. The synchronisation makes code safer but at a price of the performance. But it’s another topic for a much longer discussion.

As we can see, it’s quite handy to create a thread with a lambda expression. It’s local to your executing thread, and you can do everything like with a regular function or functor object.

And now the question to you: Do you use lambdas for threads?

It’s quite often that threading code is much complicated than a three or fives lines of lambda code. In that context maybe it’s better to write a separate thread function outside? What do you think? Do you have any rules on that?

Let’s now try another technique that is available in C++.

Lambdas with std::async

A second way that you can leverage multithreading is through std::async. We got that functionality together with threads in C++11. This is a high-level API that allows you to set up and call computations lazily or fully asynchronously.

Let’s convert our example with iota into the async call:

std::vector<int> numbers(100);

std::future<void> iotaFuture = std::async(std::launch::async, 
    [&numbers, startArg = 10]() {
        std::iota(numbers.begin(), numbers.end(), startArg);
        std::cout << "calling from: " << std::this_thread::get_id() 
                  << " thread id\n";
    }
);

iotaFuture.get(); // make sure we get the results...
std::cout << "numbers in main (id " << std::this_thread::get_id() << "):\n";
for (auto& num : numbers)
    std::cout << num << ", ";

This time rather than threads, we rely on the mechanism of std::future. This is an object which handles the synchronisation and guarantees that the results of the invocation are ready.

In our case we schedule the execution of the lambda through std::async, and then we need to call .get() to finish the computations. The .get() member function is blocking.

However the code above is a bit cheating as we’re using future<void> and the vector is still passed as the reference captured by lambda. As an alternative you might want to create std::future<std::vector<int>>:

std::future<std::vector<int>> iotaFuture = std::async(std::launch::async, 
[startArg = 10]() { std::vector<int> numbers(100); std::iota(numbers.begin(), numbers.end(), startArg); std::cout << "calling from: " << std::this_thread::get_id() << " thread id\n"; return numbers; } ); auto vec = iotaFuture.get(); // make sure we get the results... // ...

Let’s make a stop here.

While the above code should work, it seems that over the years std::async/std::future got mixed reputation. It looks like the functionality was a bit too rushed. It works for relatively simple cases but fails with advanced scenarios like:

  • continuation
  • task merging
  • no cancellation/joining
  • it’s not a regular type
  • and few other issues

I’m not using this framework in production, so I won’t pretend I’m an expert here. If you want to know more, you should read or watch the following resources:

You can also have a look at my recent question that I asked on Twitter:

Lambdas and Parallel Algorithms from C++17

After discussing the threading support in C++11, we can move to further standards: C++17. This time you have a super easy-to-use technique that allows you to parallelise most of the algorithms from the Standard Library. All you have to do is to specify the first argument into the algorithm, for example:

auto myVec = GenerateVector();
std::sort(std::execution::par, myVec.begin(), myVec.end());

We have the following options:

Policy Name Description
sequenced_policy It is an execution policy type used as a unique type to disambiguate parallel algorithm overloading and require that a parallel algorithm’s execution not be parallelised.
parallel_policy It is an execution policy type used as a unique type to disambiguate parallel algorithm overloading and indicate that a parallel algorithm’s execution may be parallelised.
parallel_unsequenced_policy It is an execution policy type used as a unique type to disambiguate parallel algorithm overloading and indicate that a parallel algorithm’s execution may be parallelised and vectorised.

For example, we can quickly came up with the following (bad/suspicious) code that performs a copy with a filter:

std::vector<int> vec(1000);
std::iota(vec.begin(), vec.end(), 0);
std::vector<int> output;
std::for_each(std::execution::par, vec.begin(), vec.end(),
    [&output](int& elem) {
        if (elem % 2 == 0) {
            output.push_back(elem);
        }
});

Do you see all the issues here?

We can fix the synchronisation problem by having a mutex and locking it before each push_back. But is that code still efficient? If the filter condition is straightforward and fast to execute, then you might even get slower performance than the serial version.

Not to mention is the fact that by running it in parallel, you don’t know the order of the copied elements in the output vector.

So while parallel algorithms are relatively easy to use - just pass the first argument, and you get the speed… it’s easy to forget that you’re still working with parallel code and all the safety rules have to be obeyed.

Capturing this

Before we finish, it’s also important to mention one more topic: how to capture this pointer.

Have a look at the following code:

struct User {
    std::string _name;

    auto getNameCallback() {
        return [this](const std::string& b) { 
            return _name + b; 
        };
    }
};

void callbackTest() {
    auto pJohn = std::make_unique<User>(User{ "John" });
    auto nameCallback = pJohn->getNameCallback();
    pJohn.reset();

    const auto newName = nameCallback(" is Super!");
    std::cout << newName << '\n';
}

Do you know what happens when we try to call nameCallback() ?

.

.

.

It’s Undefined Behaviour!

For example, in my debugging session in Visual Studio, I got an exception.

Exception thrown at 0x00007FFC0323A799 in cpptests.exe: 
Microsoft C++ exception: std::bad_alloc at memory location 0x000000F77DEFEF20.

This is because in nameCallback we try to access a member of the User structure. However, since the object of this type was deleted (via pJohn.reset()) then we’re trying to access a deleted memory region.

In this case, we can fix our code by using C++17 feature that allows capturing *this as a copy of the instance. That way, even if the original object is destroyed, the lambda will contain a safe copy.

One note: the copy is made when you create a lambda object, not at the place where you invoke it! (thanks to the JFT comment below).

struct User {
    std::string _name;

    auto getSafeNameCallback() {
        return [*this](const std::string& b) { // *this!
            return _name + b; 
        };
    }
};

void callbackTest() {
    auto pJohn = std::make_unique<User>(User{ "John" });
    auto nameCallback = pJohn->getSafeNameCallback();
    pJohn.reset();

    const auto newName = nameCallback(" is Super!");
    std::cout << newName << '\n';
}

Now, after the changes, the code works as expected.

Of course, it’s not always possible to change code like that. In many cases, you don’t want a copy. One place where I can see this might be callbacks for UI events. For QT I’ve seen code like this:

QPushButton* button = new QPushButton(text);        
connect(button, &QPushButton::clicked, [this]() {
                // do something on "this"
            }
        );

You need to pay attention and make sure the instance of the object is present and still alive.

Summary

Throughout this blog post, I wanted to make you aware of the potentially harmful situation with captured variables: use after delete and also synchronisation issues.

Lambdas make it easy to capture objects from the outside scope and thus you can easily forget to apply a proper synchronisation mechanism on those captured objects, or simply check if the references object is still present. However, if you happen to write a thread code as a separate function, then it’s harder to “capture” variables, and that can make you more aware of the synchronisation issues.

One thing to ask is also the readability of the code. If your thread is just several lines of code it’s probably fine, but do you use it for more complex logic? Or you prefer regular functions or functors?

You can play with my sample code at @Coliru - as it supports 4 cores for threading (std::thread::hardware_concurrency()).

Back to you

  • Do you use lambdas with threads or async?
  • Have you tried parallel algorithms?
  • What do you think about the readability of lambdas vs regular functions for threads?

References

If you like my work and you want to get extra C++ content, exlusive articles and weekly curated news, then check out my Patreon website:

© 2017, Bartlomiej Filipek, Blogger platform
Disclaimer: Any opinions expressed herein are in no way representative of those of my employers. All data and information provided on this site is for informational purposes only. I try to write complete and accurate articles, but the web-site will not be liable for any errors, omissions, or delays in this information or any losses, injuries, or damages arising from its display or use.
This site contains ads or referral links, which provide me with a commission. Thank you for your understanding.