c++ – C++17 multi threaded thread “pauser”

I don’t see any major problems with the code, but it is probably over-complicated for what you really want it to do. But of course, there’s no way to be sure, because you’re not actually telling what you actually want it to do. So this review is going to contain a lot of: “🤷🏼 Depends on what you’re actually doing.”

As is the norm with concurrent code these days, there’s the expected std::condition_variable—for reasons beyond me, std::condition_variable has become the std::endl of concurrent code. Everybody uses it; nobody really seems to understand why; and 95% of the time, a simple std::atomic_flag would do just as well (which also seems true in this case, but only with the C++20 std::atomic_flag). But is this actually a case where you need std::condition_variable? 🤷🏼 Depends on what you’re actually doing.

The one thing I try to impress on everybody with regards to concurrent programming is that concurrent programming is still in its infancy. Established programming paradigms have patterns that apply virtually universally; for example, the guideline against using goto in structured programming is pretty much a rule you can apply without thought all the time everywhere. But concurrent programming is still very much the wild west: there are very, very few “one size fits all” solutions.

What that means in this case is that there is no one right way to “pause” a thread. It really depends on how long you want to pause for. If you just want to pause for a few nanoseconds, then you can get away with a spin-lock (or something similar that doesn’t actually require literally spinning, like using __mm_pause on x86 architectures). Much more than that, then maybe you want to yield the thread’s time slice. Much more than that, maybe you want to tell the thread scheduler to stop scheduling the thread until notified.

Basically, it is impossible to have a “one size fits all” solution for “pausing” because you have to balance:

  • how long you want to pause
  • how quickly you want to resume when unpaused; and
  • how many resources you want to use.

This is an engineering trilemma (similar to “you can have it quickly, you can have it cheaply, you can have it done well; choose any two”). There is no “correct” solution.

Given that you’re not telling what you actually want to do, I have to guess that you don’t intend to pause for long periods of time. The reason why I assume that is because if you are pausing for really long periods, it doesn’t really make sense to keep the threads live and sleeping… it makes much more sense to release those resources, and reacquire them if you need them again—that’s just being a much better citizen on limited-resource devices. On the other hand, if you’re not targeting limited-resource devices, then holding on to those threads and sleeping them for long periods would make sense. So, once again: 🤷🏼 Depends on what you’re actually doing.

There’s also another dimension worth mentioning here. When you say something like, “I want to pause a bunch of threads,” you’re not actually saying what you really want to do. You’re missing the forest for the trees. You’re (almost certainly) not writing a program whose purpose is to create and manage a bunch of threads; you’re writing a program that does something actually useful, and managing those threads is just an implementation issue. For that reason, thread management shouldn’t be the focus… if anything, it should be pretty much invisible. And most modern concurrent tools are designed around that thinking.

Put another way: if you are thinking, “I want to pause a bunch of a threads,” then you’re thinking wrong. You should be thinking, “I want to do a bunch of tasks, but sometimes it will be necessary to wait for the next set of tasks to be ready”. Worded that way, the entire notion of a thread pausing utility is specious. You probably want something more like a concurrent queue, which will make handling a bunch of tasks much more natural, and will automatically and invisibly handle all the pausing necessary, AND even automatically handle scaling.

But, as always: 🤷🏼 Depends on what you’re actually doing.

HOWEVER, there’s a reason that other way of thinking is so prevalent. If you just focus on the pausing, you’re failing to see the bigger picture and introducing all kinds of unnecessary headaches. What happens if the “pause” is never “unpaused”? In that case, your app locks up. How does pausing interact with stopping/ending the thread? These issues are problems with a design based on “pausing threads”… but they would evaporate entirely if your focus was on the actual task(s) at hand:

  • What happens if no new tasks ever become ready? Meh, same thing that happens when all tasks are done. Not a problem.
  • How does no tasks being ready interact with ending the overall job? Quite naturally; the job obviously ends when there are no more tasks to do. So, not a problem.

So here’s a summary of the design overview:

  • Your current design may be absurdly overwrought for what you need… or it may be perfect. 🤷🏼 Depends on what you’re actually doing.
  • The entire concept may be pointless and misguided; rather than a “thread pauser”, you may actually need something completely different, like a concurrent queue… or it may be exactly what you need. 🤷🏼 Depends on what you’re actually doing.
  • The concept may be fundamentally flawed, because it doesn’t actually model what you really need, and for that reason doesn’t interact well with the rest of the design.

So this review might end up being completely useless to you. But you can’t say you weren’t warned.

explicit ThreadPause(bool paused = false);

I am generally not a fan of defaulted paramaters—they cause far more trouble than they’re generally worth, especially in a case like this.

On top of that, using a bool in this context is kinda muddying the waters. When I see auto thread_pause = util::ThreadPause{true};, it’s not actually clear that that means that any thread that uses thread_pause will start out paused… or rather, will not “start” at all. Which is pretty dangerous, because it could lock up the whole app, depending. There doesn’t seem like a good reason to have this kind of interface when the following is more clear:

auto thread_pause = util::ThreadPause{};
thread_pause.pause();

// now do whatever.

The only counter argument is that setting the bool in the constructor is much more efficient than calling pause(). That’s true, but it’s more a criticism of pause() than of the pattern… pause() shouldn’t be so inefficient. But we’ll get to that.

((nodiscard))
bool is_paused() const;

Given the purpose of this class, this function seems completely pointless. It will never be a good idea to use it, because by the time you get the result, it may no longer be true.

void wait();

This isn’t a great name for this function for several reasons. It’s not really clear that the intention is that this is the co-operation point between the controller and the worker thread. If the “pause” isn’t active, then you’re not actually waiting on anything.

I get that you want to make the logical connection to waiting (and maybe later you could add wait_for() and wait_until()), but given what a thread pausing thingy actually does (or rather, should do), it doesn’t quite match the model. But we’ll get more into that later.

Now into the actual implementation:

void util::ThreadPause::wait() {
    while(is_paused()){
        std::unique_lock<std::mutex> lock(m_mutex);
        //need the condition in here in case of spurious wake up, 
        //wont exit condition unless condition true
        m_cv.wait(lock, (&paused = m_paused){return !paused;});
    }
}

I’m not sure a loop makes sense here. That’s a rather vicious pausing strategy. What it would mean is that someone calls resume()… then the worker thread notices the pause is no longer active, the condition is satisfied, and the lock released… but then, in that tiny space between destructing lock and checking the loop condition again, someone calls pause() again.

It’s a little bit absurd to assume that would likely happen, or that if it did, that it should be honoured. If the thread was unpaused, you might as well let it go ahead and do its work… if it happened to be re-paused while attempting to do that, it will notice it on the next loop. That seems fine enough.

void util::ThreadPause::pause() {
    //lock around the variable so with can modify it
    std::unique_lock<std::mutex> lock(m_mutex);
    m_paused = true;
}

void util::ThreadPause::resume() {
    //lock around the variable so with can modify it
    std::unique_lock<std::mutex> lock(m_mutex);
    m_paused = false;
    //unlock to stop threads from immediately locking when notify is called.
    lock.unlock();
    m_cv.notify_all();
}

These are where I’d say the biggest over-engineering problems lie, as well as the biggest efficiency headaches. You are correct to protect a naked bool with a mutex. The question is… does it really need to be a naked bool?

What if m_paused were a std::atomic<bool> instead?

Let’s start with pause().

You don’t specify what is supposed to happen if pause() is called while already paused, but it does just work. However, whether already paused or not, the calling thread has to acquire a lock, which will probably block (because all the worker threads using the object are repeatedly calling wait(), which also locks the same mutex). That block is completely unnecessary. All you really want to do is properly set m_paused and publish that. That’s what atomics are for:

class ThreadPause
{
    // ... (snip) ...

    std::atomic<bool> m_paused;
    std::mutex m_mutex;
    std::condition_variable m_cv;
};

void util::ThreadPause::wait()
{
    if (m_paused) // or: m_paused.load(std::memory_order_acquire)
    {
        auto lock = std::unique_lock{m_mutex};
        m_cv.wait(lock, (&m_paused){ return not m_paused; }); // or with acquire
    }
}

void util::ThreadPause::pause()
{
    m_paused = true; // or: m_paused.store(std::memory_order_release)
}

void util::ThreadPause::resume()
{
    if (m_paused.exchange(false)) // memory_order_acq_rel
    {
        m_cv.notify_all();
    }
}

wait() is basically unchanged (though in practice, the case when unpaused is resolved MUCH faster because there will be less pointless locking/unlocking). pause() and resume() are now going to be MUCH faster, and possibly lock-free.

Note that in C++20, atomics have evolved to include wait-notify semantics, and are MUCH more efficient than condition variables. So you could do:

class ThreadPause
{
    // ... (snip) ...

    std::atomic_flag m_paused;
};

void util::ThreadPause::wait()
{
    m_paused.wait(true); // or: m_paused.wait(true, std::memory_order::acquire)
}

void util::ThreadPause::pause()
{
    m_paused.test_and_set(); // memory_order::acq_rel
                             // possibly memory_order::release?
}

void util::ThreadPause::resume()
{
    if (m_paused.test()) // memory_order::acquire
    {
        m_paused.clear(); // memory_order::release
        m_paused.notify_all();
    }
}

The downside is that if you want to implement wait_for() or wait_until(), things get tougher.

An alternative concept

You have already noticed, as seen in the example code, that to properly control a thread you need a pause token and an exit/stop token. You currently have them as two separate things… which creates problems. (What happens if pause if true and exit is true? Wait to exit? What order to check pause/exit in? And so on.)

Perhaps it would make more sense to create a unified control token that determines whether a thread should run, or should be suspended temporarily (paused), or suspended permanently (stopped). This would simplify both control and setup (because you don’t need to pass multiple “things” to the thread to control it).

You’ll notice I’ve also been using specific terminology, talking about “tokens”. That’s because I’m looking at the C++20 <stop_token> library. <stop_token> goes half-way to what you need; it controls stopping threads, but not pausing them. So you need to extend them. Maybe something like this:

namespace util {

enum class control_state
{
    running,
    paused,
    stopped
};

class control_token
{
public:
    auto request_stop() noexcept -> void;    // sets state to stopped
    auto request_pause() noexcept -> void;   // sets state to paused if not stopped
    auto request_unpause() noexcept -> void; // sets state to running if not stopped

    ((nodiscard)) auto stop_requested() const noexcept -> bool;
    ((nodiscard)) auto stop_possible() const noexcept -> bool;

    ((nodiscard)) auto state() const noexcept -> control_state;
};

// used to control multiple threads
class control_source
{
public:
    auto request_stop() noexcept -> void;
    auto request_pause() noexcept -> void;
    auto request_unpause() noexcept -> void;

    ((nodiscard)) auto stop_requested() const noexcept -> bool;
    ((nodiscard)) auto stop_possible() const noexcept -> bool;

    ((nodiscard)) auto state() const noexcept -> control_state;

    ((nodiscard)) auto get_token() const noexcept -> control_token;
};

// blocks if paused, otherwise returns either running or stopped
((nodiscard)) auto control_wait(control_token const&) -> control_state;
((nodiscard)) auto control_wait(control_source const&) -> control_state;
// maybe also wait_for() and wait_until()

// maybe also control_callback

class controlled_thread
{
public:
    ~controlled_thread();

    auto get_control_source() const noexcept -> control_source;
    auto get_control_token() const noexcept -> control_token;

    auto request_stop() noexcept -> bool;
    auto request_pause() noexcept -> bool;
    auto request_unpause() noexcept -> bool;

    // other, standard thread functions

private:
    std::thread _thread;     // exposition only
    control_source _control; // exposition only
};

controlled_thread::~controlled_thread()
{
    if (_thread.joinable())
    {
        _control.request_stop();
        _thread.join();
    }
}

} // namespace util

And you might rewrite your main() example like this:

auto main() -> int
{
    using namespace std::chrono_literals;

    auto thread_function = ()(std::stop_token stop_token, util::control_source control_source, int& increment)
    {
        // note that we need to check *both* the stop_token and the control_source
        //
        // why?
        //
        // because the control_source is the overall control for the entire
        // collection of threads - if we use that to stop, it stops *all* the
        // threads
        //
        // the stop_token is the control for this one single thread - if that
        // says to stop, then only this one thread is being stopped
        //
        // we need both, because the control source is how we pause/unpause/stop
        // the threads AS A GROUP... while the stop token is how we handle
        // single threads being stopped separate from the group (usually by
        // being destroyed)
        //
        // if we don't use stop_token - if we only use control_source - then
        // we can't destroy individual threads without first stopping the
        // entire group... which is clunky and dangerous

        while (not stop_token.stop_requested() and control_wait(control_source) != control_state::stopped)
        {
            increment += 10;
        }
    };

    // control source to control multiple threads
    auto control_source = util::control_source{};

    std::size_t thread_count = 10;

    auto increments = std::vector(thread_count, 0);
    auto threads = std::vector<std::jthread>{};
    for (auto i = std::size_t{0}; i < threads.size(); ++i)
    {
        threads.emplace_back(thread_function, control_source, std::ref(increments(i))});
    }

    // i used std::jthreads rather than util::controlled_threads because the
    // threads aren't being controlled (that is, paused/unpaused) individually
    //
    // however, you could use util::controlled_thread, which (with some minor
    // modifications to the thread function) would allow you to pause threads
    // individually, as well as pausing the entire group at once, if that's of
    // interest to you

    std::this_thread::sleep_for(100ms);

    control_source.request_pause();
    std::this_thread::sleep_for(100ms);
    std::cout << "Current values for increments: n";
    for(auto increment : increments){
        std::cout << increment << "n";
    }
    std::this_thread::sleep_for(100ms);
    std::cout << "Pause values for increments: n";
    for(auto increment : increments){
        std::cout << increment << "n";
    }

    control_source.request_resume();
    std::this_thread::sleep_for(100ms);

    control_source.request_pause();
    std::this_thread::sleep_for(100ms);
    std::cout << "Resume values for increments: n";
    for(auto increment : increments){
        std::cout << increment << "n";
    }

    // control_source.request_resume(); // not necessary unless you actually want to resume before stopping
    
    control_source.request_stop(); // not strictly necessary (see next comment)

    // threads will automatically stop and join
    //
    // note that this will happen even if an exception is thrown (and caught),
    // unlike the original code
    //
    // if control_source.request_stop() was not called, that's fine, because
    // each individual thread in threads will still be stopped individually by
    // its stop_token on destruction (rather than all of them being stopped at
    // once by the shared control_source *then* being destroyed... same shit,
    // different pile)
}

This mechanism is non-trivial to implement, but it so worth it when it comes to safety (not to mention efficiency). As you can see, it makes pretty much the entire last fifth or so of your main() no longer necessary; all cleanup is automatic… and safe, because it happens even in the event of an exception… and everything cleans up properly even if the threads were paused at destruction time (which might easily happen if an unexpected exception is thrown while paused).