Thursday, December 12, 2013

Heterogeneous return values from thread pools

To perform an analysis means decomposing a situation or problem into its constituents. However, there are not any “primitives” that naturally exist. The components defined are arbitrary and depend on individual goals and methods of calculation. The basic elements of a fire are different to a fire fighter, an insurance claims adjuster, and an arson investigator.

-- Gary Klein, Sources of Power

This blog item is about a small extension to a simple thread pool which makes life a little more bearable when writing code running many tasks in parallel where the tasks have return values of different types.

The code from this entry and some of the previous entries can be retrieved from: https://github.com/hansewetz/blog

A simple thread pool (found somewhere on the Web)

Thread pools are useful constructs. When quickly scribbling down prototype code for multi-threaded applications or experimenting with how to best manage parallelisation of code, having a couple of good flexible thread pools is indispensable.

The thread pool I'll use as a base can be found all over the Internet. It's a simple pool managing a queue of pending tasks and a collection of threads. It does not use condition variables to wake up threads when more work is available. Instead a thread when finding the task queue empty simply yields until the next time the system wakes it up. The pool implementation is fairly standard so I'll just give the code here:

#include <iostream>
#include <thread>
#include <atomic>
#include <queue>
#include <vector>
#include <tuple>
#include <mutex>
#include <future>
#include <chrono>

// task class (hides actual type of function to be executed)
class task{
    struct base{
      virtual void call()=0;
    };
    template<typename F>
    struct derived:base{
      derived(F&&f):f_(std::move(f)){}
      void call(){f_();}
      F f_;
    };
    std::unique_ptr<base>impl_;
public:
    // ctor, assign, dtor
    task()=default;
    template<typename F>task(F&&f):impl_(new derived<F>(std::move(f))){}
    task(task&&other):impl_(std::move(other.impl_)){}
    task&operator=(task&&tsk){impl_=std::move(tsk.impl_);}

    // call
    void operator()(){impl_->call();}
};
// thread pool class
class tpool{
public:
  // ctor
  tpool(std::size_t nthreads):nthreads_(nthreads),done_(false){
    for(std::size_t i=0;i<nthreads_;++i){
      std::thread thr(&tpool::thread_func,this);
      threads_.push_back(std::move(thr));
    }
  }
  // dtor
  ~tpool(){
    done_=true;
    for(auto&t:threads_)t.join();
  }
  // get #of threads
  std::size_t nthreads()const{return nthreads_;}

  // submit task
  template<typename F,typename R=typename std::result_of<F()>::type>
  std::future<R>submit(F f){
    std::packaged_task<R()>pt(f);
    auto ret=std::move(pt.get_future());
    add_task(task(std::move(pt)));
    return ret;
  }
private:
  // thread function
  void thread_func(){
    while(!done_){
      try{
        task tsk;
        if(try_get(tsk))tsk();
        else std::this_thread::yield();
      }
      catch(...){
        done_=true;
      }
    }
  }
  // get next task
  bool try_get(task&tsk){
    std::unique_lock<std::mutex>lock(qmtx_);
    if(taskq_.empty())return false;
    tsk=std::move(taskq_.front());
    taskq_.pop();
    return true;
  }
  // add task to queue
  void add_task(task&&tsk){
    std::unique_lock<std::mutex>lock(qmtx_);
    taskq_.push(std::move(tsk));
  }
  // state
  std::size_t nthreads_;
  std::vector<std::thread>threads_;
  std::atomic_bool done_;
  std::queue<task>taskq_;
  std::mutex qmtx_;
};

The submit method wraps the user function in a std::packaged_task, retrieves the std::future from the packaged_task, wraps the std::packaged_task in a task and finally enqueues the task in the task queue. The std::future is returned to the caller.

Once a task is in the task queue, a thread picks it up and executes it. The caller retrieves the return value of the task by waiting for the future:

#include <functional>
using namespace std;

// main test program
int main(){
  // create function returning a string
  auto f=[]()->string{return "Hello";};

  // submit task
  tpool tp(2);
  auto fut=tp.submit(f);
  cout<<fut.get()<<endl;
}

Being lazy and letting others do the waiting

Being a lazy person I would like to be able to submit multiple tasks in one shot, have the thread pool wait for the tasks to complete and have all return values handed to me in one go. Now, again, since I'm lazy I don't want to worry about the tasks having different return types – I just want all return values, regardless of types, to be returned to me in one blob. The solution is clearly to bundle them in a std::tuple.

I'll add the functionality as follows. First I'll expose a variadic template member function taking a variable number of callable objects, possible each returning a different type. Second, I'll submit the tasks while collecting the future for each task. After this I'll wait for the tasks and create a tuple containing the return values. Finally I'll return the tuple:

…
// submit a set of tasks and return a tuple containing results of tasks
  template<typename...F,typename RET=std::tuple<typename std::result_of<F()>::type...>>
  RET submitTasksAndWait(F...f){
    auto futs=submitTasksReturnFuts(f...);
    RET ret;
    waitForFutures(ret,std::move(futs));
    return ret;
  };
private:
  // submit tasks and return futures in a tuple
  template<typename F,typename...G,typename RET=std::tuple<std::future<typename std::result_of<F()>::type>,std::future<typename std::result_of<G()>::type>...>>
  RET submitTasksReturnFuts(F f,G...g){
    auto tu1=make_tuple(std::move(submit(f)));
    auto tu2=submitTasksReturnFuts(g...);
    return std::tuple_cat(std::move(tu1),std::move(tu2));
  }
  template<typename F,typename FUTS=std::tuple<std::future<typename std::result_of<F()>::type>>>
  FUTS submitTasksReturnFuts(F f){
    return std::make_tuple(std::move(submit(f)));
  }
  // wait for futures stored in a tuple and return results
  template<typename R1,typename...R2,typename F1,typename...F2>
  void waitForFutures(std::tuple<R1,R2...>&ret,std::tuple<F1,F2...>&&futs){
    auto r1=std::make_tuple(std::get<0>(futs).get());
    auto r2=tutils::popn_front_tuple<1>(ret);
    auto t2=tutils::popn_front_tuple<1>(futs);
    waitForFutures(r2,std::move(t2));
    ret=std::tuple_cat(std::move(r1),std::move(r2));
  }
  template<typename R1,typename F1,typename RET=std::tuple<R1>>
  void waitForFutures(std::tuple<R1>&ret,std::tuple<F1>&&fut){
    std::get<0>(ret)=std::move(std::get<0>(fut).get());
  }
…

The code uses tutils::popn_front_tuple which pops the first element off a tuple. The function is part of the tutils namespace and can be retrieved from https://github.com/hansewetz/blog. The code can be used as follows:

// main test program
int main(){
  // create function returning another function
  auto f=[]()->string{return "Hello";};
  auto g=[]()->std::size_t{return 17;};

  // create and submit tasks
  tpool tp(2);
  auto res=tp.submitTasksAndWait(f,f,g,f,g,g,g,g,f,f,g,g,g);
  cout<<transform_tuple(type2string_helper(),res)<<endl;
  cout<<"res: "<<res<<endl;
}

What if a task returns void?

The problem with the code in the previous section is that it cannot handle return types of void. I'll solve that by injecting a dummy type void_t in the return tuple when the tasks return value is void.

Before listing the code I'll make a few points. First, I moved the class task inside the tpool class. I also add a meta function fret_type which I use when deducing the return type of a function. Two overloaded functions for retrieving the return value from a future are also added. One returns the actual return value whereas the other returns an object of void_t. Two submit functions are exposed: one taking a tuple containing functions to be submitted and one taking a variadic list of functions (as in the previous example).

For clarity I'll show the entire piece of code in one shot here:

#ifndef __TPOOL_H__
#define __TPOOL_H__
#include "type-utils/type_utils.h"
#include <iosfwd>
#include <thread>
#include <atomic>
#include <queue>
#include <vector>
#include <tuple>
#include <mutex>
#include <future>
#include <chrono>
#include <type_traits>

namespace tutils{

// ------------------- tpool --------------
class tpool{
private:
  // forward decl.
  template<typename F,typename R=typename std::result_of<F()>::type>struct fret_type;
  class task;
public:
  // return type for functions returning void
  struct void_t{};

  // ctor
  tpool(std::size_t nthreads):nthreads_(nthreads),done_(false){
    for(std::size_t i=0;i<nthreads_;++i){
      std::thread thr(&tpool::thread_func,this);
      threads_.push_back(std::move(thr));
    }
  }
  // dtor
  ~tpool(){
    done_=true;
    for(auto&t:threads_)t.join();
  }
  // get #of threads
  std::size_t nthreads()const{return nthreads_;}

  // submit task
  template<typename F,typename R=typename std::result_of<F()>::type>
  std::future<R>submit(F f){
    std::packaged_task<R()>pt(f);
    auto ret=std::move(pt.get_future());
    add_task(task(std::move(pt)));
    return ret;
  }
  // submit tuple of tasks and return a tuple containing results of tasks
  template<typename...F,typename RET=std::tuple<typename fret_type<F>::type...>>
  RET submitTaskTupleAndWait(std::tuple<F...>ftu){
    auto futs=submitTasksReturnFuts(std::forward<decltype(ftu)>(std::move(ftu)));
    RET*dummy=nullptr;
    return waitForFutures(dummy,std::move(futs));
  };
  // submit list of tasks and return a tuple containing results of tasks
  template<typename...F,typename RET=std::tuple<typename fret_type<F>::type...>>
  RET submitTasksAndWait(F...f){
    std::tuple<F...>ftu{std::move(f)...};
    return submitTaskTupleAndWait(std::move(ftu));
  };
private:
  // submit tasks and return futures in a tuple
  template<typename F,typename...G,typename RET=std::tuple<std::future<typename std::result_of<F()>::type>,std::future<typename std::result_of<G()>::type>...>>
  RET submitTasksReturnFuts(std::tuple<F,G...>ftu){
    auto tu1=make_tuple(std::move(submit(std::get<0>(ftu))));
    auto tu2=submitTasksReturnFuts(tutils::popn_front_tuple<1>(ftu));
    return std::tuple_cat(std::move(tu1),std::move(tu2));
  }
  template<typename F,typename FUTS=std::tuple<std::future<typename std::result_of<F()>::type>>>
  FUTS submitTasksReturnFuts(std::tuple<F>ftu){
    return std::make_tuple(std::move(submit(std::get<0>(ftu))));
  }
  // wait for futures stored in a tuple and return results
  template<typename R1,typename...R2,typename F1,typename...F2>
  std::tuple<R1,R2...>waitForFutures(std::tuple<R1,R2...>*,std::tuple<F1,F2...>&&futs)const{
    // use references to futures
    auto&fu1=std::get<0>(futs);
    auto r1=std::make_tuple(get_fut(static_cast<R1*>(nullptr),fu1));
    auto t2=tutils::popn_front_tuple<1>(futs);
    std::tuple<R2...>*dummy;
    auto r2=waitForFutures(dummy,std::move(t2));
    return std::tuple_cat(std::move(r1),std::move(r2));
  }
  template<typename R1,typename F1>
  std::tuple<R1>waitForFutures(std::tuple<R1>*,std::tuple<F1>&&fut)const{
    // use reference to future
    auto&fu1=std::get<0>(fut);
    return std::make_tuple(get_fut(static_cast<R1*>(nullptr),fu1));
  }
  // wait for future and return result
  template<typename R,typename FU>
  R get_fut(R*,FU&fu)const{
    return fu.get();
  }
  template<typename FU>
  void_t get_fut(void_t*,FU&fu)const{
    fu.get();
    return void_t();
  }
  // thread function
  void thread_func(){
    while(!done_){
      try{
        task tsk;
        if(try_get(tsk))tsk();
        else std::this_thread::yield();
      }
      catch(...){
        done_=true;
      }
    }
  }
  // get next task
  bool try_get(task&tsk){
    std::unique_lock<std::mutex>lock(qmtx_);
    if(taskq_.empty())return false;
    tsk=std::move(taskq_.front());
    taskq_.pop();
    return true;
  }
  // add task to queue
  void add_task(task&&tsk){
    std::unique_lock<std::mutex>lock(qmtx_);
    taskq_.push(std::move(tsk));
  }
  // meta function returning 'void_t' as return type if F returns void, else as std::result_of<F>::type
  template<typename F,typename R>
  struct fret_type{
    using type=typename std::conditional<std::is_same<R,void>::value,void_t,R>::type;
  };
  // task class (hides actual type of function to be executed)
  class task{
      struct base{
        virtual void call()=0;
      };
      template<typename F>
      struct derived:base{
        derived(F&&f):f_(std::move(f)){}
        void call(){f_();}
        F f_;
      };
      std::unique_ptr<base>impl_;
  public:
      // ctor, assign, dtor
      task()=default;
      template<typename F>task(F&&f):impl_(new derived<F>(std::move(f))){}
      task(task&&other):impl_(std::move(other.impl_)){}
      task&operator=(task&&tsk){impl_=std::move(tsk.impl_);}

      // call
      void operator()(){impl_->call();}
  };
  // state
  std::size_t nthreads_;
  std::vector<std::thread>threads_;
  std::atomic_bool done_;
  std::queue<task>taskq_;
  std::mutex qmtx_;
};
// print operator for tpool::void_t;
std::ostream&operator<<(std::ostream&os,tpool::void_t const&){return os<<'.';}
}
#endif

I can use the code as follows (notice one of the functions have a void return type):

#include "tpool.h"
#include <functional>
#include <iostream>

using namespace std;
using namespace tutils;

struct Foo{
  Foo(){cout<<"ctor"<<endl;}
  Foo(string const&str){cout<<"string ctor"<<endl;}
  Foo(Foo const&f){cout<<"copy ctor"<<endl;}
  Foo(Foo&&f){cout<<"move ctor"<<endl;}
  Foo&operator=(Foo const&f){cout<<"copy assignment"<<endl;}
  Foo&operator=(Foo&&f){cout<<"move assign"<<endl;}
  ~Foo(){cout<<"dtor"<<endl;}
};
ostream&operator<<(ostream&os,Foo const&foo){return cout<<"foo ";};

// main test program
int main(){
  // create function returning another function
  auto f=[]()->Foo{return Foo("Hello");};
  auto g=[]()->void{cout<<"world"<<endl;};
  function<Foo()>ff=f;

  // create and submit tasks
  {
    tpool tp(2);
    auto res=tp.submitTasksAndWait(ff,f,[]()->int{return 6;},g);
    cout<<transform_tuple(type2string_helper(),res)<<endl;
    cout<<"res: "<<res<<endl;
  }
  {
    tpool tp(2);
    auto tu=make_tuple(ff,f,[]()->int{return 6;},g);
    auto res=tp.submitTaskTupleAndWait(tu);
    cout<<transform_tuple(type2string_helper(),res)<<endl;
    cout<<"res: "<<res<<endl;
  }
}

Conclusions

In this item I only gave a mini example of how to customise a simple thread pool. However, with features from C++11 such as variadic templates and tuples together with utilities in type_traits it's relatively simple, in general, to implement flexible compile time polymorphic data structures. Getting used to developing code on both sides of the fence dividing compile time from runtime requires quite a lot of painful late night coding, often pushing ones patience to the limit. However, ones the Aha! moment hits you, you might wonder why didn't I code like this all along.