Thursday, July 31, 2014

Using queues with boost::asio - Part I

There are things known, and there are things unknown... and in between there are the doors

-- The Doors adopted the saying of William Blake

Ones in a while I stumble when a boost::asio based design suddenly requires me to listen on an in-memory queue or any other queue for that matter. Usually the way out is adding one or more threads to the design. A better way would be to have an actual boost::asio queue listener available. This is what this blog item is about.

Introduction

Some time ago I finally sat down and hacked in a small queue listener aligned with the boost::asio framework. The design is simple and somewhat generic and is described in this blog entry together with the code and sample tests.

Also, as you will see, there is no magic in the machinery behind boost::asio IO objects - they are in fact rather simple and just require some thought before designing and coding them.

This blog entry describes a simple extension to boost::asio which listens on queues. The queue must have certain characteristics. A simple queue tailored for working with the queue listener is provided in the code. A small example shows how the queue listener can be used.

In case you find bugs or blatant errors in design, code or both please feel free to contact me. Also, if you want to get directly to the code skip the following sections and go to the end of this blog item.

The code is a first cut, it works but there are some features that are missing and some parts that should probably be thought through more carefully.

What I want

I'll start with an example showing what I would like to be able to do:

// queue listener handler for queue 1
template<typename T>
void qhandler1(boost::system::error_code const&ec,T item,boost::asio::simple_queue_listener<T>*asioq,shared_ptr<boost::asio::simple_queue<string>>q1){
  if(!ec){
    // process queue item
    // ...

    // re-charge queue listener
    asioq->async_deq(q1,std::bind(qhandler1<T>,_1,_2,asioq,q1));
  }
}
// test program
int main(){
  try{
    // underlying queue
    shared_ptr<boost::asio::simple_queue<string>>q1{new boost::asio::simple_queue<string>};

    // asio io service
    boost::asio::io_service ios;

    // asio queue listeners
    boost::asio::simple_queue_listener<string>qlistener1(ios);
    qlistener1.async_deq(q1,std::bind(qhandler1<string>,_1,_2,&qlistener1,q1));

    // ... setup other asio objects

    // run io service
    ios.run();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(debug)<<"cought exception: "<<e.what();
  }
}

The queue I use here, boost::asio::simple_queue, is a simple queue which supports the requirements of boost::asio::simple_queue_listener. However, any queue should do as long as it implements the required functionalities. I'll describe boost::asio::simple_queue later on.

The queue listener behaves like any other asio objects when there is no more work to be done. When boost::asio detects that there is no more pending work it will no longer generate asynchronous events by calling the handler. The listener also supports creation of multiple IO objects listening on the same or on different queues.

The code runs a separate thread for each IO object to ensure that the thread managing the main io_service object is not blocked. There are other ways to handle it but this way works and is simple to code and understand.

Structure: the rules of the game

Many developers have worked with boost::asio but have not really looked under the hood to see how the IO objects actually work. The best way to understand what's going on inside boost::asio is to read the code of existing extension as well as reading the boost::asio code itself.

Underneath each type of IO object is a single instance of a service object, the boost::asio::io_service::service. Each instance of an IO object has a corresponding implementation object. The service object is responsible for managing the implementation objects. For example, when an implementation object should be destroyed a destroy(Impl) method is called on the service object.

The implementation object is responsible for actually performing the asynchronous operation. In this case the implementation object waits for a message on a queue and pops the message off the queue when one is available.

The code in the boost::simple_queue_listener runs a separate thread for each implementation object so the thread managing the main io_service object is not blocked. There are other ways to design it, but here I've chosen one which is simple and straight forward.

All in all we need to implement three classes: one for the IO object, one for the underlying service and one for the implementation. I'll also implement a simple queue class which will serve as the default queue when using the queue listener. However, the queue listener could also use any other queue implementation as long as it satisfies certain requirements which I'll go through later. It should be noted that the queue class can be used as a standalone class without any dependency on boost::asio.

Behaviour: the rules of the game

There are a few points related to what happens when that are important to understand when implementing an extension to boost::asio. I'll describe a few of them here.

First, service objects are destroyed when the corresponding boost::asio::io_service object is destroyed. This means that service objects may hang around long after there is no more work to be done. Therefore, some care should be taken to ensure that we don't hang onto resources even if there is no work to be done.

It does makes sense that service objects hang around even when there is no more work. After all, we don't know if there will be more work later on for an IO object.

Conceptual when writing an boost::asio extension a simple view to take is a see the boost::asio::io_service as an even loop listening on an internal queue. Other objects, typically the service objects posts events on the queue. Once an event appears in the queue, boost::asio::io_service dispatches the event to a handler.

In the implementation of an asio service, we should be aware of that as long as the service has not been activated, in this case, the boost::asio::async_deq has not been called, the service should not be listening on the underlying queue.

If at the application level there is outstanding work to do (we are blocking on the underlying queue) and we need to stop blocking queue listeners, it is convenient to be able to stop them and receive an aborted error code in the corresponding handler. I have implemented this in the boost::asio::simple_queue even though it is not mandatory to be compatible with boost::simple_queue_listener. I'll show how it works in an example :

// timer pop function
constexpr static size_t tmoSec{3};
void timerPopped1(const boost::system::error_code&ec,boost::asio::deadline_timer*tmo,shared_ptr<boost::asio::simple_queue<string>>q1){
  // disable dequeing (will stop async listener on q1)
  q1->disable_deq(true);
}
  ...

  // create a deadline timer and register it
  boost::asio::deadline_timer tmo(ios,boost::posix_time::seconds(10));
  tmo.async_wait(std::bind(timerPopped1,_1,&tmo,q1));

  ...

The IO object

Implementing the IO object is formulaic:
// forward decl
class queue_listener_impl;
template<typename Impl=queue_listener_impl>class basic_queue_listener_service;

// --- IO Object (used by client) -----------------------------
template<typename Service,typename Queue>
class basic_queue_listener:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_listener(boost::asio::io_service&io_service):
      boost::asio::basic_io_object<Service>(io_service) {
  }
  // async deq operation
  template <typename Handler>
  void async_deq(std::shared_ptr<Queue>q,Handler handler) {
    this->service.async_deq(this->implementation,q,handler);
  }
};
// typedef for using standard service object
template<typename T>
using simple_queue_listener=basic_queue_listener<basic_queue_listener_service<>,simple_queue<T>>;

The main points to notice are:

  1. there are two forward declarations, one for an implementation class and one for the service class
  2. the basic_queue_listener base class knows about the boost::asio::io_service instance
  3. the async_deq method is non-blocking and invokes the service object passing the implementation object as parameter

The Service object

Only a single instance of the service object is created for one type of service. In this case, only one instance of boost::asio:: basic_queue_listener_service is created for one Impl type. The implementation is straight forward :

// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_listener_service:public boost::asio::io_service::service{
public:
  // required to have id of service
 static boost::asio::io_service::id id;

  // ctor
  explicit basic_queue_listener_service(boost::asio::io_service&io_service):
      boost::asio::io_service::service(io_service){
  }
  // dtor
  ~basic_queue_listener_service(){
  }
  // get a typedef  for implementation
  using implementation_type=std::shared_ptr<Impl>;

  // mandatory (construct an implementation object)
  void construct(implementation_type&impl){
      impl.reset(new Impl(this->get_io_service()));
  }
  // mandatory (destroy an implementation object)
  void destroy(implementation_type&impl){
    impl.reset();
  }
  // async sync deq operation
  template <typename Handler,typename Queue>
  void async_deq(implementation_type&impl,std::shared_ptr<Queue>q,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_deq(impl,q,handler);
  }
private:
  // shutdown service (required)
  void shutdown_service(){
  }
};
// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_listener_service<Impl>::id;

The two interesting methods are construct and destroy. In construct I pass the io_service down to the implementation object since I will let the implementation post responses directly to the main io_service. Some people have mentioned that it would be cleaner if I run an io_service directly in the service in a separate thread and let the implementation post responses back to the service. I chose not to do this in order to keep the code as simple as possible.

The destroy method makes sure that we release the implementation object. Notice that the destroy method is called when the io_service object is destructed.

The async_deq method does a non-blocking call which in the implementation object triggers a blocking wait on the underlying queue. This is important since we must ensure that we don't block the main thread of execution. Clearly, by running one or more threads in the service object we could have achieved the same thing. But again, I went for the simplest solution that is both clean, easy to follow and understand.

Also notice that we start the io_service with an asio::io_service::work object so that the run method will continue to run even if when no work is available.

The boost::asio::io_service::id is required even though we don't use it directly.

The Implementation object

The implementation object does the actual work. In this implementation it performs two tasks. First it blocks for an item to appear on a queue. Ones it has the item it posts it on an io_service. The io_service was passed to the implementation object by the service object and corresponds to the main io_service. That is, the de-queued items will appear in a handler specified by the client of the IO object.

The code kicks of a thread in the constructor. It also sets up an io_service which makes it simpler to pass information from the async_deq method to the thread. Remember, the async_deq cannot block since blocking would block the client thread.

// --- implementation -----------------------------
class queue_listener_impl{
public:
  // ctor (set up work queue for io_service so we don't bail out when executing run())
  queue_listener_impl(boost::asio::io_service&post_io_service):
      impl_work_(new boost::asio::io_service::work(impl_io_service_)),
      impl_thread_([&](){impl_io_service_.run();}),
      post_io_service_(post_io_service){
  }
  // dtor (clear work queue, stop io service and join thread)
  ~queue_listener_impl(){
    impl_work_.reset(nullptr);
    impl_io_service_.stop();
    if(impl_thread_.joinable())impl_thread_.join();
  }
public:
  // deque message (post request to thread)
  template<typename Handler,typename Queue>
  void async_deq(std::shared_ptr<queue_listener_impl>impl,std::shared_ptr<Queue>tq,Handler handler){
    impl_io_service_.post(deq_operation<Handler,Queue>(impl,post_io_service_,tq,handler));
  }
private:
  // function object calling blocking deq() on queue
  template <typename Handler,typename Queue>
  class deq_operation{
  public:
    // ctor
    deq_operation(std::shared_ptr<queue_listener_impl>impl,boost::asio::io_service &io_service,std::shared_ptr<Queue>tq, Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),tq_(tq),handler_(handler) {
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_listener_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do blocking call on queue, otherwise post aborted message
      if(impl){
        std::pair<bool,typename Queue::value_type>ret{tq_->deq()};
        boost::system::error_code ec=(!ret.first?boost::asio::error::operation_aborted:boost::system::error_code());
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec,ret.second));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,boost::asio::error::operation_aborted,typename Queue::value_type()));
      }
    }
  private:
    std::weak_ptr<queue_listener_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    std::shared_ptr<Queue>tq_;
    Handler handler_;
  };
  // private data
  boost::asio::io_service impl_io_service_;
  std::unique_ptr<boost::asio::io_service::work>impl_work_;
  std::thread impl_thread_;
  boost::asio::io_service&post_io_service_;
};

The std::weak_ptr is used in the thread to protect against the implementation object being destroyed while the thread is running. It's a simple hack which ensures that the thread won't access data in a non-existing object.

Notice that a thread is created in the constructor which runs the boost::asio::io_service run method. The thread when executing operator() is waiting for request to de-queue items from the underlying queue. It is also interesting to notice that, should the async_deq be called multiple times on the same implementation object, the requests will queue up in the local io_service queue. Even though not recommended because it is confusing, it is possible to make multiple async_deq calls on the same IO object which will then translate to multiple calls to the same implementation object.

If the async_deq would be called using the same IO service but with different queues, the second call would block until the first one de-queues a message from the queue. This is not an ideal way of programming since from the client side it would appear as if the second queue is blocking. A better way is to dedicate one IO object per queue which then will de-queue in its own separate threads.

An alternative design would have been to store the queue in the implementation object and not pass it as a parameter to the async_deq call. This causes its own problems which must be solved but would be an alternative and possibly better design.

Queue requirements

The underlying queue must satisfy some requirements:

  1. the queue must have a method deq which must be thread safe
  2. the value_type in the queue must be default constructible
  3. the value_type in the queue must be copy constructible

A simple example

The example spawns a thread which starts inserting messages in a queue. At the same time a boost::asio::queue_listener picks up messages form the queue. When the receiver has received all messages it does not re-load the async_deq method:

#include "queue_listener.h"
#include <boost/asio.hpp>
#include <boost/log/trivial.hpp>
#include <boost/lexical_cast.hpp>
#include <string>
#include <memory>
#include <thread>
using namespace std;
using namespace std::placeholders;

// some constants
constexpr size_t maxmsg1{10};
constexpr size_t tmoSeleepBetweenSendMs1{100};

// queue listener handler for queue 1
size_t nreceived1{0};
template<typename T>
void qhandler1(boost::system::error_code const&ec,T item,boost::asio::simple_queue_listener<T>*asioq,shared_ptr<boost::asio::simple_queue<string>>q1){
  // print item if error code is OK
  if(ec)BOOST_LOG_TRIVIAL(debug)<<"received item in qhandler1 (via asio), item: <invalid>, ec: "<<ec;
  else{
   BOOST_LOG_TRIVIAL(debug)<<"received item in qhandler1 (via asio), item: "<<item<<", ec: "<<ec;
   if(++nreceived1!=maxmsg1)asioq->async_deq(q1,std::bind(qhandler1<T>,_1,_2,asioq,q1));
  }
}
// queue sender for queue 1
size_t nsent{0};
void senderq1(shared_ptr<boost::asio::simple_queue<string>>q1){
  for(;nsent<maxmsg1;++nsent){
    string item{boost::lexical_cast<string>(nsent)};
    BOOST_LOG_TRIVIAL(debug)<<"sending item \""<<item<<"\"in separate thread ...";
    q1->enq(item);
    this_thread::sleep_for(std::chrono::milliseconds(tmoSeleepBetweenSendMs1));
  }
}
// test program
int main(){
  try{
    // underlying queue
    shared_ptr<boost::asio::simple_queue<string>>q1{new boost::asio::simple_queue<string>};

    // asio io service
    boost::asio::io_service ios;

    // asio queue listeners
    boost::asio::simple_queue_listener<string>qlistener1(ios);
    qlistener1.async_deq(q1,std::bind(qhandler1<string>,_1,_2,&qlistener1,q1));

    // run a sender thread, run io service and join sender thread
    std::thread thrq1{senderq1,q1};
    ios.run();
    thrq1.join();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(debug)<<"cought exception: "<<e.what();
  }
}
}

Compilation and linking

I compiled the code using gcc4.9 on RedHat and gcc4.10 on Ubunto 12.04.04 with options: -DBOOST_ALL_DYN_LINK -DASIO_STANDALONE. The DASIO_STANDALONE is needed when using C++11 feautures within boost::asio.

Conclusions

There are some improvements that could be made. For example, some mechanisms preventing calls to the deq_async method while there are still outstanding requests could be implemented. This is not as easy as it seems because of raise condition and I therefore skipped it in this implementation.

I didn't bother with implementing synchronous operations. Neither did I implement methods which throw exceptions as opposed to returning an error code. In a full fledge implementation these functionalities must be added. But so far, I see this implementation as an extension to a specific implementation I did for a specific project. But no doubt, the queue extension will evolve to include more features and functionalities.

There is a piece of functionality that is missing: there is no asio support on the side adding items to the queue. Say we want to throttle on the queue. That is, say we want to ensure that there are no more than 10 items at any time in the queue. This could be achieved by implementing a boost::asio::queue_sender which generates events whenever there is room to add another item to the queue. I haven't implemented this yet but will probably do it in the near future and post it as the next blog entry.

The default queue boost::asio::simple_queue should no doubt contain more features. Here I minimized the queue just to save some space in the blog entry. Methods like size, capacity etc. should be added as well as the possibility to incorporate moving of objects as opposed to copying them. This will have to wait until the next round.

The full implementation

boost::asio::queue_listener

#ifndef __QUEUE_LISTENER_H__
#define __QUEUE_LISTENER_H__
#include "simple_queue.h"
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>
namespace boost{
namespace asio{

// forward decl
class queue_listener_impl;
template<typename Impl=queue_listener_impl>class basic_queue_listener_service;

// --- IO Object (used by client) -----------------------------
template<typename Service,typename Queue>
class basic_queue_listener:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_listener(boost::asio::io_service&io_service):
      boost::asio::basic_io_object<Service>(io_service) {
  }
  // async deq operation
  template <typename Handler>
  void async_deq(std::shared_ptr<Queue>q,Handler handler) {
    this->service.async_deq(this->implementation,q,handler);
  }
};
// typedef for using standard service object
template<typename T>
using simple_queue_listener=basic_queue_listener<basic_queue_listener_service<>,simple_queue<T>>;

// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_listener_service:public boost::asio::io_service::service{
public:
  // required to have id of service
 static boost::asio::io_service::id id;

  // ctor
  explicit basic_queue_listener_service(boost::asio::io_service&io_service):
      boost::asio::io_service::service(io_service){
  }
  // dtor
  ~basic_queue_listener_service(){
  }
  // get a typedef  for implementation
  using implementation_type=std::shared_ptr<Impl>;

  // mandatory (construct an implementation object)
  void construct(implementation_type&impl){
      impl.reset(new Impl(this->get_io_service()));
  }
  // mandatory (destroy an implementation object)
  void destroy(implementation_type&impl){
    impl.reset();
  }
  // async sync deq operation
  template <typename Handler,typename Queue>
  void async_deq(implementation_type&impl,std::shared_ptr<Queue>q,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_deq(impl,q,handler);
  }
private:
  // shutdown service (required)
  void shutdown_service(){
  }
};
// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_listener_service<Impl>::id;

// --- implementation -----------------------------
class queue_listener_impl{
public:
  // ctor (set up work queue for io_service so we don't bail out when executing run())
  queue_listener_impl(boost::asio::io_service&post_io_service):
      impl_work_(new boost::asio::io_service::work(impl_io_service_)),
      impl_thread_([&](){impl_io_service_.run();}),
      post_io_service_(post_io_service){
  }
  // dtor (clear work queue, stop io service and join thread)
  ~queue_listener_impl(){
    impl_work_.reset(nullptr);
    impl_io_service_.stop();
    if(impl_thread_.joinable())impl_thread_.join();
  }
public:
  // deque message (post request to thread)
  template<typename Handler,typename Queue>
  void async_deq(std::shared_ptr<queue_listener_impl>impl,std::shared_ptr<Queue>tq,Handler handler){
    impl_io_service_.post(deq_operation<Handler,Queue>(impl,post_io_service_,tq,handler));
  }
private:
  // function object calling blocking deq() on queue
  template <typename Handler,typename Queue>
  class deq_operation{
  public:
    // ctor
    deq_operation(std::shared_ptr<queue_listener_impl>impl,boost::asio::io_service &io_service,std::shared_ptr<Queue>tq, Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),tq_(tq),handler_(handler) {
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_listener_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do blocking call on queue, otherwise post aborted message
      if(impl){
        std::pair<bool,typename Queue::value_type>ret{tq_->deq()};
        boost::system::error_code ec=(!ret.first?boost::asio::error::operation_aborted:boost::system::error_code());
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec,ret.second));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,boost::asio::error::operation_aborted,typename Queue::value_type()));
      }
    }
  private:
    std::weak_ptr<queue_listener_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    std::shared_ptr<Queue>tq_;
    Handler handler_;
  };
  // private data
  boost::asio::io_service impl_io_service_;
  std::unique_ptr<boost::asio::io_service::work>impl_work_;
  std::thread impl_thread_;
  boost::asio::io_service&post_io_service_;
};
}
}

boost::asio::simple_queue


#ifndef __SIMPLE_QUEUE_H__
#define __SIMPLE_QUEUE_H__
#include <utility>
#include <queue>
#include <mutex>
#include <condition_variable>
namespace boost{
namespace asio{

// a simple thread safe queue used as default queue in boost::asio::queue_listener
template<typename T,typename Container=std::queue<T>>
class simple_queue{
public:
  // typedef for value stored in queue
  // (need this so we can create an item with default ctor)
  using value_type=T;

  // ctors,assign,dtor
  simple_queue()=default;
  simple_queue(simple_queue const&)=delete;
  simple_queue(simple_queue&&)=default;
  simple_queue&operator=(simple_queue const&)=delete;
  simple_queue&operator=(simple_queue&&)=default;
  ~simple_queue()=default;

  // put a message into queue
  void enq(T t){
    std::lock_guard<std::mutex>lock(mtx_);
    q_.push(t);
    cond_.notify_all();
  }
  // dequeue a message (return.first == false if deq() was disabled)
  std::pair<bool,T>deq(){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !deq_enabled_||!q_.empty();});

    // if deq is disabled or queue is empty return 
    if(!deq_enabled_||q_.empty()){
      return std::make_pair(false,T{});
    }
    // check if we have a message
    std::pair<bool,T>ret{std::make_pair(true,q_.front())};
    q_.pop();
    return ret;
  }
  // cancel deq operations (will also release blocking threads)
  void disable_deq(bool disable){
    std::unique_lock<std::mutex>lock(mtx_);
    deq_enabled_=!disable;
    cond_.notify_all();
  }
  // check if queue is empty
  bool empty()const{
    std::unique_lock<std::mutex>lock(mtx_);
    return q_.empty();
  }
private:
  mutable std::mutex mtx_;
  mutable std::condition_variable cond_;
  bool deq_enabled_=true;
  Container q_;
};
}
}
#endif

3 comments:

  1. Do you mind if I quote a couple of your articles as long as I provide credit and sources back to your blog? My blog site is in the exact same area of interest as yours and my visitors would genuinely benefit from some of the information you present here. Please let me know if this alright with you. Thank you! cheap elo boost

    ReplyDelete