A simple C++11 concurrent workqueue

Posted on September 30, 2014
Tags: C++, C++11, condition variable, lambda, mutex, thread

For a little toy project of mine (a wikipedia XML dump word counter) I wrote a little C++11 helper class to distribute work to all available CPU cores. It took me many years to overcome my fear of threading: In the past, whenever I toyed with threaded code, I ended up having a lot of deadlocks, and generally being confused. It appears that I finally have understood enough of this crazyness to be able to come up with the small helper class below.

The problem

We want to spread work amongst all available CPU cores. There are no dependencies between items in our work queue. So every thread can just pick up and process an item as soon as it is ready.

The solution

This simple implementation makes use of C++11 threading primitives, lambda functions and move semantics. The idea is simple: You provide a function at construction time which defines how to process one item of work. To pass work to the queue, simply call the function operator of the object, repeatedly. When the destructor is called (once the object reachs the end of its scope), all remaining items are processed and all background threads are joined.

The number of threads defaults to the value of std::thread::hardware_concurrency(). This appears to work at least since GCC 4.9. Earlier tests have shown that std::thread::hardware_concurrency() always returned 1. I don’t know when exactly GCC (or libstdc++, actually) started to support this, but at least since GCC 4.9, it is usable. Prerequisite on Linux is a mounted /proc.

The number of maximum items per thread in the queue defaults to 1. If the queue is full, calls to the function operator will block.

So the most basic usage example is probably something like:

int main() {
  typedef std::string item_type;
  distributor<item_type> process([](item_type &item) {
    // do work
  });

  while (/* input */) process(std::move(/* item */));

  return 0;
}

That is about as simple as it can get, IMHO.

The code can be found in the GitHub project mentioned above. However, since the class template is relatively short, here it is.

#include <condition_variable>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>

template <typename Type, typename Queue = std::queue<Type>>
class distributor: Queue, std::mutex, std::condition_variable {
  typename Queue::size_type capacity;
  bool done = false;
  std::vector<std::thread> threads;

public:
  template<typename Function>
  distributor( Function function
             , unsigned int concurrency = std::thread::hardware_concurrency()
         , typename Queue::size_type max_items_per_thread = 1
         )
  : capacity{concurrency * max_items_per_thread}
  {
    if (not concurrency)
      throw std::invalid_argument("Concurrency must be non-zero");
    if (not max_items_per_thread)
      throw std::invalid_argument("Max items per thread must be non-zero");

    for (unsigned int count {0}; count < concurrency; count += 1)
      threads.emplace_back(static_cast<void (distributor::*)(Function)>
                           (&distributor::consume), this, function);
  }

  distributor(distributor &&) = default;
  distributor &operator=(distributor &&) = delete;

  ~distributor()
  {
    {
      std::lock_guard<std::mutex> guard(*this);
      done = true;
      notify_all();
    }
    for (auto &&thread: threads) thread.join();
  }

  void operator()(Type &&value)
  {
    std::unique_lock<std::mutex> lock(*this);
    while (Queue::size() == capacity) wait(lock);
    Queue::push(std::forward<Type>(value));
    notify_one();
  }

private:
  template <typename Function>
  void consume(Function process)
  {
    std::unique_lock<std::mutex> lock(*this);
    while (true) {
      if (not Queue::empty()) {
        Type item { std::move(Queue::front()) };
        Queue::pop();
        notify_one();
        lock.unlock();
        process(item);
        lock.lock();
      } else if (done) {
        break;
      } else {
        wait(lock);
      }
    }
  }
};

If you have any comments regarding the implementation, please drop me a mail.