C++ Concurrency 101

Utilizing the potential of multi-core CPUs as found in virtually all modern hardware can only be achieved by writing multi-threaded programs. Code which is multi-threaded allows multiple threads to run through the same piece of code simultaneously, using different logical processors. The way this can be achieved is by allowing each thread to use a different stack frame and for the code segment being executed to be read-only; sharing read-only memory is safe. In this article we’ll cover the basics of concurrency as found in Modern C++, which by necessity can’t be the full story as entire books have been written on the subject. Only the high-level concepts and standard library building-blocks will be covered but hopefully this will provide a starting-point for readers new to either Modern C++ or the topic of concurrency.

There are a number of different ways to write concurrent programs in C++. In previous articles we’ve looked at using parallel algorithms to speed up code, and also the use of coroutines. In this article we’ll look at some of the major threading facilities in the standard library: std::thread, std::async and std::future.

std::thread

Class std::thread is the building-block on which multi-threaded capabilities in Modern C++ are built, and itself can rely on lower-level threading functionality (such as C-based pthreads in GCC). The simplest use of std::thread is to call a function (with or without parameters, which should be passed by value if any) to be executed in parallel:

#include <thread>

void f() {
    for (;;) {}
}

int main() {
    std::thread t(f);
}

The function f() does nothing except loop forever, and this program does nothing except exit immediately with an error message such as: terminate called without an active exception. This is because all std::thread objects should be joinable on all paths, and program exit while a thread is active calls std::terminate which is rarely desirable.

The solution is to call join():

int main() {
    std::thread t(f);
    t.join();
}

This program now runs as expected, being stuck in an infinite loop with no output being produced (admittedly this is rarely desirable either). An alternative is to call detach():

int main() {
    std::thread t(f);
    t.detach();
}

This program exits straightaway as for the first one, except no error message is produced. Instead the thread is quietly terminated as main() returns (for this reason detachable threads should not access global objects as these may be destroyed before the thread ends).

A slightly more interesting scenario is creating multiple threads and then launching them in parallel, as shown in this program:

#include <thread>
#include <mutex>
#include <chrono>
#include <vector>
#include <iostream>

using namespace std::chrono_literals;

class ThreadID {
    std::thread::id id;
    inline static std::mutex mut;
public:
    ThreadID(const std::thread::id id) : id{ id } {
        std::lock_guard<std::mutex> lock(mut);
        std::cerr << "Launched thread: " << id << '\n';
    }
    ~ThreadID() {
        std::lock_guard<std::mutex> lock(mut);
        std::cerr << "Finished thread: " << id << '\n';
    }
};

void f() {
    ThreadID log(std::this_thread::get_id());
    std::this_thread::sleep_for(3000ms);
}

int main() {
    std::vector<std::thread> threads;
    unsigned threadCount = (std::thread::hardware_concurrency() < 2) ? 1 : std::thread::hardware_concurrency();
    for (int t = 0; t != threadCount; ++t) {
        threads.emplace_back(f);
    }
    for (auto& t : threads) {
        t.join();
    }
}

The class ThreadID contains one data member of type std::thread::id for recording which thread is being launched or destroyed, and a static (class) member of type std::mutex used to lock std::cerr. The code std::lock_guard lock(mut) is boilerplate code which is encountered quite often, and only permits one execution path (out of all parallel processes) to pass through this scope (or sub-scope) at any one time, meaning that output is never interleaved.

In parallel function f() we create a ThreadID object to have the same lifetime as the function, and initialize it with std::this_thread::get_id() which is guaranteed to be unique for each thread. We then pause for three seconds before exiting.

The main() function creates a container of threads and decides how many to create by querying std::thread::hardware_concurrency(). This function returns a hint regarding the number of logical parallel processors (one for single-threaded or zero for not well-defined). By setting threadCount to this value minus one, we leave a core free to try to ensure that the user-interface of the operating system does not become unresponsive during execution of the program. (Using fewer threads than available processors is called undersubscription, using more is called oversubscription.)

It is important to create all of the threads before joining any of them in order to facilitate parallel execution. This program should take three seconds to execute regardless of the number of logical processors available, producing output similar to:

Launched thread: 139697060570816
Launched thread: 139697043785408
Launched thread: 139697052178112
(Three second delay)
Finished thread: 139697060570816
Finished thread: 139697043785408
Finished thread: 139697027000000

Importantly the order of creation and destruction of threads is not usually guaranteed. In addition, std::thread objects can be moved but not copied. As well as the member functions described above there is also joinable() (returning a Boolean indicating whether the thread has already been joined) and native_handle() (returning an integer value of the thread’s OS level handle). Since C++20 the class std::jthread has also been available which provides various other features such as automatically joining on destruction, thereby avoiding a call to std::terminate.

std::async

Manual organization of thread creation can be tedious and error-prone, and so far we have not described how to return a value from a std::thread (it isn’t directly possible, we need futures and promises, see later). The functionality provided by std::async uses the same hardware and library features as std::thread, in other words it is no less performant. It also allows for deferred launch and return of a result by value.

The following program populates a std::vector with the first billion integers and then calculates the sum using four parallel calls to std::accumulate. Note that having more than four logical processors available gives no speed advantage in this instance, and that using a parallel version of std::accumulate directly would usually be a better option:

#include <future>
#include <thread>
#include <vector>
#include <algorithm>
#include <numeric>
#include <iostream>

const int sumSize = 1'000'000'000;

int main() {
    std::vector<int> summation(sumSize);
    std::iota(summation.begin(), summation.end(), 1);

    auto result1 = std::async([&]{ return std::accumulate(summation.begin(), summation.begin() + summation.size() / 4, 0LL); });
    auto result2 = std::async([&]{ return std::accumulate(summation.begin() + summation.size() / 4, summation.begin() + summation.size() / 2, 0LL); });
    auto result3 = std::async([&]{ return std::accumulate(summation.begin() + summation.size() / 2, summation.begin() + summation.size() * 3 / 4, 0LL); });
    auto result4 = std::async([&]{ return std::accumulate(summation.begin() + summation.size() * 3 / 4, summation.end(), 0LL); });
    auto sum = result1.get() + result2.get() + result3.get() + result4.get();

    std::cout << "Sum of first " << sumSize << " numbers is: " << sum << '\n';
}

In this code the initialization of each std::async invocation is performed with a lambda function, although other callables can be used. Each lambda returns the result a call to std::accumulate of a separate portion of the summation container. It is assumed that the call to std::async launches a new thread (as and when available) whose return type can be queried with member function get(). Therefore the above code blocks at the initialization of variable sum. (Two other launch policies are available as the first parameter to std::async, they are std::launch::async and std::launch::deferred.)

std::future

A std::future<T> object has a member function get() which returns a value of type T. (This is in fact the type of a call to std::async.) This object is initialized with a call to get_future() on a std::promise<T> within the caller code. The std::promise<T> is itself populated from within the callee code (the thread) with a call to set_value(). The call to get() blocks until this value has been set; each thread is passed a reference to the promise at its initialization. The following program has the same result as that for std::async except it uses all of the available parallel processing capabilities:

#include <future>
#include <thread>
#include <vector>
#include <iostream>

void f(std::promise<long long>& p, int begin, int end) {
    long long sum{};
    for (int value = begin; value != end; ++value) {
        sum += value + 1;
    }
    p.set_value(sum);
}

int main() {
    const int sumSize = 1'000'000'000;
    unsigned threadCount = (std::thread::hardware_concurrency() < 2) ? 1 : std::thread::hardware_concurrency();
    std::vector<std::promise<long long>> promises(threadCount);
    std::vector<std::future<long long>> futures(threadCount);
    std::vector<std::thread> threads;

    int portionBegin{}, portionEnd{};
    for (int t = 0; t != threadCount; ++t) {
        portionEnd = portionBegin + sumSize / threadCount;
        threads.emplace_back(f, std::ref(promises.at(t)), portionBegin, portionEnd);
        portionBegin = portionEnd;
        futures.at(t) = promises.at(t).get_future();
    }
    
    for (auto& t : threads) {
        t.join();
    }
    long long sum{};
    for (auto& f : futures) {
        sum += f.get();
    }
    std::cout << "Sum of first " << sumSize << " numbers is: " << sum << '\n';
}

The function f() takes a reference to a std::promise<long long> as its first parameter as well as a begin and end value to accumulate to variable sum. The function ends with a call to set_value() and the return type is void.

The main() function obtains threadCount as before and then creates three containers of type std::promise<long long>, std::future<long long> and std::thread. The variables portionBegin and portionEnd keep a running range of values to pass to f(). Each new thread is created with a reference to the (empty) promise, and each future is assigned the value of get_future() on this promise.

After all of the threads, promises and futures have been created all of the threads are joined. Finally the partial sums are added to the total sum with calls to get() on all of the futures.

Conclusion

In this article we have looked at some of the building-blocks for multi-threaded programming in C++. Threads can be created, launched, detached and joined as desired, but for many applications such low-level considerations are not necessary. The std::async functionality is sufficient for many applications which require return of a result value, and use of std::future and std::promise provide the flexibility of using std::threads directly while allowing use of return values.

Source code for this article is available on GitHub.

Leave a comment