Thursday, August 7, 2014

boost::asio Queues Revamped - Part II

As I get older and crankier, I find myself more and more exasperated with the great inflexible sets of rules that many companies try to pour into concrete and sanctify as methods. The idea that a single method should govern even two different projects is highly suspect: the differences between projects are much more important than the similarities.

-- Tom DeMarco, Controlling Software Projects

Introduction

This entry is short and contains a few modifications to the previous entry together with some new features. Since most of the interesting things are in the code, this entry is almost all code with only a few lines of text interspersed.

Updates to boost::asio queues

In my previous blog entry Using queues with boost::asio - Part I I made a few mistakes which I will correct here. I will also add an asynchronous queue_sender which blocks when the queue is full and invokes a handler when it has inserted an item in the queue. Additionally I changed the API slighly so that a queue_listener (and also the new queue_sender) takes the queue as a constructor parameter

Problems, mistakes and other idiosyncrasies

I try to publish what I believe is interesting and usefull code as soon as I have used it in one or more designs or projects. Even though the code in my blog entries are rather small, I do get comments from collegues and via email pointing out mistakes, things that could have been done better or been done in a different way. I also do get emails where developers have used the code in their own projects!

I'm fully aware of that since the code has not been reviewed by many people, there are mistakes and there are things that could have been done better.

In case you notice mistakes, bugs, or code that are simply incorrect, please feel free to contact me at hansewetz@hotmail.com.

Mistakes in my previous design

In the previous design it was possible to call async_deq passing a queue followed by one or more calls passing in the same or another queue. Since the implementation object runs a single thread, the requests would be queued in the io_service in the implementation object. As a consequence, if async_deq was called on q1 and then called on q2, de-queuing on q2 could be waiting forever if there were no messages in q1.

This problem is corrected by having each IO object handle a single queue which is passed as a parameter in the constructor. The downside is that it is no longer possible to use the same IO object to simultaneously listen on different queues.

I could have kept the original design by managing multiple threads in the implementation object at the cost of a more complex design. The current design is simple and easy to understand so I'll stick with it

New features and an example

The main new feature in the design is a queue_sender which blocks when the queue is full. The underlying queue takes a parameter specifying the maximum number of items the queue can hold. Here is an example:

#include "queue_sender.h"
#include <boost/asio.hpp>
#include <boost/log/trivial.hpp>
#include <string>
#include <memory>
#include <chrono>
using namespace std;
using namespace std::placeholders;

// asio queue stuff
shared_ptr<boost::asio::simple_queue<int>>q{new boost::asio::simple_queue<int>(3)};
boost::asio::io_service ios;
boost::asio::simple_queue_sender<int>qsender(::ios,q);

// handler for queue sender
template<typename T>
void qsender_handler(boost::system::error_code const&ec,T item){
  BOOST_LOG_TRIVIAL(debug)<<"sending item in qlistener_handler (via asio), item: "<<item<<", ec: "<<ec;
  qsender.async_enq(item+1,std::bind(qsender_handler<T>,_1,item+1));
  std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
// test program
int main(){
  try{
    // start sending messages
    int startItem{1};
    qsender.async_enq(startItem,std::bind(qsender_handler<int>,_1,startItem));

    // kick off io service
    ::ios.run();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(error)<<"cought exception: "<<e.what();
  }
}

For brevity I've skipped error checking. Specifically I'm not checking the ec error code from boost::asio.

The sampel code above will insert exactly 3 integers (1, 2, 3) into the queue after which the async_deque blocks on the queue waiting for the queue to have items removed. As soon as an item is removed, the qsender_handler handler is invoked. In this example there is no code de-queuing messages so the run method will never return.

A more realistic example

A slighly more realistic example is one where a queue sender removes messages from the queue:

#include "queue_listener.h"
#include "queue_sender.h"
#include <boost/asio.hpp>
#include <boost/log/trivial.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <string>
#include <memory>
#include <thread>
using namespace std;
using namespace std::placeholders;

// asio queue stuff
shared_ptr<boost::asio::simple_queue<int>>q{new boost::asio::simple_queue<int>(3)};
boost::asio::io_service ios;
boost::asio::simple_queue_listener<int>qlistener(::ios,q);
boost::asio::simple_queue_sender<int>qsender(::ios,q);

// max #of messages to send/receive
constexpr size_t maxmsg{10};

// handler for queue listener
template<typename T>
void qlistener_handler(boost::system::error_code const&ec,T item, int nreceived){
  // print item if error code is OK
  BOOST_LOG_TRIVIAL(debug)<<"received item in qlistener_handler (via asio), item: "<<item<<", ec: "<<ec;
  if(nreceived+1<maxmsg)qlistener.async_deq(std::bind(qlistener_handler<T>,_1,_2,nreceived+1));
}
// handler for queue sender
template<typename T>
void qsender_handler(boost::system::error_code const&ec,int item,int nsent){
  BOOST_LOG_TRIVIAL(debug)<<"sending item in qlistener_handler (via asio), item: "<<item<<", ec: "<<ec<<", queue size: "<<q->size();
  if(nsent+1<maxmsg)qsender.async_enq(item+1,std::bind(qsender_handler<T>,_1,item+1,nsent+1));
}
// test program
int main(){
  try{
    // setup timer to trigger in 3 seconds after which we start listening to messages
    boost::asio::deadline_timer tmo(::ios,boost::posix_time::seconds(1));
    tmo.async_wait([&](const boost::system::error_code&ec){qlistener.async_deq(std::bind(qlistener_handler<int>,_1,_2,0));});

    // start sending messages
    int startItem{1};
    qsender.async_enq(startItem,std::bind(qsender_handler<int>,_1,startItem,0));

    // kick off io service
    ::ios.run();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(error)<<"cought exception: "<<e.what();
  }
}

The program first sets a deadline_timer which when popping starts a queue_listener. This is so that the queue_sender will have a chance to fill up the queue and start blocking. Ones the timer pops the queue_listener removes items form the queue and the queue_sender invokes the qlistener_handler handler which will insert more messages in the queue.

In the example,the handler functions receives the number of messages processed so far. Ones the number of messages reaches maxmsg the queue functions are not reloaded and we'll exit the boost::asio::run function.

The output looks like:

sending item in qlistener_handler (via asio), item: 1, ec: system:0, queue size: 1
sending item in qlistener_handler (via asio), item: 2, ec: system:0, queue size: 2
sending item in qlistener_handler (via asio), item: 3, ec: system:0, queue size: 3
received item in qlistener_handler (via asio), item: 1, ec: system:0
received item in qlistener_handler (via asio), item: 2, ec: system:0
received item in qlistener_handler (via asio), item: 3, ec: system:0
sending item in qlistener_handler (via asio), item: 4, ec: system:0, queue size: 1
sending item in qlistener_handler (via asio), item: 5, ec: system:0, queue size: 2
sending item in qlistener_handler (via asio), item: 6, ec: system:0, queue size: 3
received item in qlistener_handler (via asio), item: 4, ec: system:0 
received item in qlistener_handler (via asio), item: 5, ec: system:0 
received item in qlistener_handler (via asio), item: 6, ec: system:0 
sending item in qlistener_handler (via asio), item: 7, ec: system:0, queue size: 1
sending item in qlistener_handler (via asio), item: 8, ec: system:0, queue size: 2
sending item in qlistener_handler (via asio), item: 9, ec: system:0, queue size: 3
received item in qlistener_handler (via asio), item: 7, ec: system:0 
received item in qlistener_handler (via asio), item: 8, ec: system:0 
received item in qlistener_handler (via asio), item: 9, ec: system:0 
sending item in qlistener_handler (via asio), item: 10, ec: system:0, queue size: 1
received item in qlistener_handler (via asio), item: 10, ec: system:0

Notice that the three first messages are inserted immediately. After the first three messages the async_enq call blocks. Once a message is de-queued, the qsender_handler is invoked which inserts a new message into the queue.

The feature of blocking when the queue is full and invokinga callback handler ones it is possible to insert more messages is useful for applications where throttling is important.

Design decisions

There are many ways the design (and implementation) of a boost::asio queue based IO object(s) could have been done. I choose to separate en-queuing and de-queuing into a queue_listener and a queue_sender mostly because there is no obvious reason why both en-queuing and de-queuing has to be done through boost::asio. The underlying queue can be used by any client code and is a generic queue not tied directly to boost::asio even though it must satisfy certain characteristics to work properly with boost::asio.

Alternativly, a single IO object, a queue could have been implemented suporting both async_enq and async_deq. Time will tell if I took the righ decision. If it turns out that having a single IO objcet is simpler, I'll change the design.M/p>

Another design decision I took was to handle only one single queue in an IO object. The reason for this was for simplicity in both design and coding. Potentially it could be to beneficial support listening on multiple queues inside a single IO object. However, I will leave this as an exercise for the future when I have more time on my hands.

Yet another example

The example below illustrates how to read from a file descriptor (or socket), process the data, insert it into a queue and finally read it from the queue - all done as asynchronous operations.

The code creates a posix asio file descriptor reading from stdin. It also creates a queue_sender and a queue_listener. Each line is read (asynchronously) from the fie descriptor, then asynchronously inserted into the queue using a queue_sender. At the same time a queue_listener reads messages asynchronously form the queue and prints them to the screen.

Admittedly, the application is not very useful - however, it does show how asio IO objects can cooperate without explicitly using threads at the application level.

Here is the code:

#include "queue_listener.h"
#include "queue_sender.h"
#include <boost/asio.hpp>
#include <boost/asio/posix/basic_descriptor.hpp>
#include <boost/log/trivial.hpp>
#include <string>
#include <memory>
using namespace std;
using namespace std::placeholders;

// asio io_service
boost::asio::io_service ios;

// variables enabling us to know when we are done
atomic<size_t>nlinesRemaining{0};
atomic<bool>done{false};

// queue stuff
shared_ptr<boost::asio::simple_queue<string>>q{new boost::asio::simple_queue<string>(3)};
boost::asio::simple_queue_listener<string>qlistener(::ios,q);
boost::asio::simple_queue_sender<string>qsender(::ios,q);

// posix stream
boost::asio::posix::stream_descriptor fd_read(::ios,0);
boost::asio::streambuf fd_data;

// handler for queue listener
template<typename T>
void qlistener_handler(boost::system::error_code const&ec,T item){
  if(!ec){
    BOOST_LOG_TRIVIAL(debug)<<"received item in queue: \""<<item<<"\"";
    qlistener.async_deq(std::bind(qlistener_handler<T>,_1,_2));
    --nlinesRemaining;
  }else{
    // ignore errors for right now
    // ...
  }
  // check if we should shut down queues
  if(done&&nlinesRemaining==0){
    q->disable_deq(true);
    q->disable_enq(true);
  }
}
// handler for fd
void fd_handler(boost::system::error_code ec,size_t size){
  if(ec.value()==boost::asio::error::eof&&size==0){
    // we are done  - but can't shut down queues until they are empty
    done=true;
    fd_read.cancel();
    fd_read.close();
  }else
  if(ec.value()){
    // ignore errors for right now
    // ...
  }else{
    // get read line and enque it asynchronously into the queue
    istream is(&fd_data);
    string item;
    getline(is,item);
    ++nlinesRemaining;
    qsender.async_enq(item,[&](boost::system::error_code const&ec){});

    // continue reading from fd
    boost::asio::async_read_until(fd_read,fd_data,'\n',std::bind(fd_handler,_1,_2));
  }
}
// test program
int main(){
  try{
    // read line by line from stdin
    boost::asio::async_read_until(fd_read,fd_data,'\n',std::bind(fd_handler,_1,_2));

    // start listening to messages from queue
    qlistener.async_deq(std::bind(qlistener_handler<string>,_1,_2));

    // run asio 
    ::ios.run();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(error)<<"cought exception: "<<e.what();
  }
}

The program can be tested as follows:

cat Makefile | test2

The result would look like:

received item in queue: "all:"
received item in queue: "   $(MAKE) -f Makefile.test1"
received item in queue: "   $(MAKE) -f Makefile.test2"
received item in queue: ""
received item in queue: "clean:"
received item in queue: "   $(MAKE) -f Makefile.test1 clean" 
received item in queue: "   $(MAKE) -f Makefile.test2 clean" 

A thing to take note of is that we must be careful to terminate the boost::asio::run function correctly. Here I use two variable, nlinesRemaining tracking number of items remaining to pull out of the queue. The other variable, done is set when there is nothing more to read.

Conclusions

Even though boost::asio is mostly focused on streams of bytes, I can't see a good reason for not supporting basic data structures such as queues in an asynchronous boost::asio environment. A typical use case is the reception of messages from a queue which are serialized through a socket as bytes. Inversely, receiving bytes through a socket which are packed into messages and sent trought a queue is also a typical use case. In the latter case, the ability to block in case the queue is getting full can become important in some environment.

Notice that blocking does not refer to blocking the boost::asio::run event loop, but rather blocking on some internal queue which is invisible to the client code.

No doubt there will be changes to the boost::asio queue support ones being seriously used. When changes are made I'll post them on this blog.

The full code

The implementation of the queue_listener is simple as is the implementation of the queue_sender. The full code for boost::asio queue support together with the default simple_queue is shown here:

simple_queue.h

#ifndef __SIMPLE_QUEUE_H__
#define __SIMPLE_QUEUE_H__
#include <utility>
#include <queue>
#include <mutex>
#include <condition_variable>
namespace boost{
namespace asio{

// a simple thread safe queue used as default queue in boost::asio::queue_listener
template<typename T,typename Container=std::queue<T>>
class simple_queue{
public:
  // typedef for value stored in queue
  // (need this so we can create an item with default ctor)
  using value_type=T;

  // ctors,assign,dtor
  simple_queue(std::size_t maxsize):maxsize_(maxsize){}
  simple_queue(simple_queue const&)=delete;
  simple_queue(simple_queue&&)=default;
  simple_queue&operator=(simple_queue const&)=delete;
  simple_queue&operator=(simple_queue&&)=default;
  ~simple_queue()=default;

  // put a message into queue
  bool enq(T t){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !enq_enabled_||q_.size()<maxsize_;});
    if(!enq_enabled_)return false;
    q_.push(t);
    cond_.notify_all();
    return true;
  }
  // dequeue a message (return.first == false if deq() was disabled)
  std::pair<bool,T>deq(){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !deq_enabled_||!q_.empty();});

    // if deq is disabled or queue is empty return 
    if(!deq_enabled_||q_.empty()){
      return std::make_pair(false,T{});
    }
    // check if we have a message
    std::pair<bool,T>ret{std::make_pair(true,q_.front())};
    q_.pop();
    cond_.notify_all();
    return ret;
  }
  // cancel deq operations (will also release blocking threads)
  void disable_deq(bool disable){
    std::unique_lock<std::mutex>lock(mtx_);
    deq_enabled_=!disable;
    cond_.notify_all();
  }
  // cancel enq operations (will also release blocking threads)
  void disable_enq(bool disable){
    std::unique_lock<std::mutex>lock(mtx_);
    enq_enabled_=!disable;
    cond_.notify_all();
  }
  // set max size of queue
  void set_maxsize(std::size_t maxsize){
    std::unique_lock<std::mutex>lock(mtx_);
    maxsize_=maxsize;
    cond_.notify_all();
  }
  // check if queue is empty
  bool empty()const{
    std::unique_lock<std::mutex>lock(mtx_);
    return q_.empty();
  }
  // get #of items in queue
  std::size_t size()const{
    std::unique_lock<std::mutex>lock;
    return q_.size();
  }
  // get max items in queue
  std::size_t maxsize()const{
    std::unique_lock<std::mutex>lock(mtx_);
    return maxsize_;
  }
private:
  std::size_t maxsize_;
  mutable std::mutex mtx_;
  mutable std::condition_variable cond_;
  bool deq_enabled_=true;
  bool enq_enabled_=true;
  Container q_;
};
}
}
#endif

queue_listener.h

#ifndef __QUEUE_LISTENER_H__
#define __QUEUE_LISTENER_H__
#include "simple_queue.h"
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>
namespace boost{
namespace asio{

// forward decl
class queue_listener_impl;
template<typename Impl=queue_listener_impl>class basic_queue_listener_service;

// --- IO Object (used by client) -----------------------------
template<typename Service,typename Queue>
class basic_queue_listener:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_listener(boost::asio::io_service&io_service,std::shared_ptr<Queue>q):
      boost::asio::basic_io_object<Service>(io_service),q_(q){
  }
  // async deq operation
  template <typename Handler>
  void async_deq(Handler handler) {
    this->service.async_deq(this->implementation,q_,handler);
  }
private:
  std::shared_ptr<Queue>q_;
};
// typedef for using standard service object
template<typename T>
using simple_queue_listener=basic_queue_listener<basic_queue_listener_service<>,simple_queue<T>>;

// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_listener_service:public boost::asio::io_service::service{
public:
  // required to have id of service
 static boost::asio::io_service::id id;

  // ctor
  explicit basic_queue_listener_service(boost::asio::io_service&io_service):
      boost::asio::io_service::service(io_service){
  }
  // dtor
  ~basic_queue_listener_service(){
  }
  // get a typedef  for implementation
  using implementation_type=std::shared_ptr<Impl>;

  // mandatory (construct an implementation object)
  void construct(implementation_type&impl){
      impl.reset(new Impl(this->get_io_service()));
  }
  // mandatory (destroy an implementation object)
  void destroy(implementation_type&impl){
    impl.reset();
  }
  // async sync deq operation
  template <typename Handler,typename Queue>
  void async_deq(implementation_type&impl,std::shared_ptr<Queue>q,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_deq(impl,q,handler);
  }
private:
  // shutdown service (required)
  void shutdown_service(){
  }
};
// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_listener_service<Impl>::id;

// --- implementation -----------------------------
class queue_listener_impl{
public:
  // ctor (set up work queue for io_service so we don't bail out when executing run())
  queue_listener_impl(boost::asio::io_service&post_io_service):
      impl_work_(new boost::asio::io_service::work(impl_io_service_)),
      impl_thread_([&](){impl_io_service_.run();}),
      post_io_service_(post_io_service){
  }
  // dtor (clear work queue, stop io service and join thread)
  ~queue_listener_impl(){
    impl_work_.reset(nullptr);
    impl_io_service_.stop();
    if(impl_thread_.joinable())impl_thread_.join();
  }
public:
  // deque message (post request to thread)
  template<typename Handler,typename Queue>
  void async_deq(std::shared_ptr<queue_listener_impl>impl,std::shared_ptr<Queue>q,Handler handler){
    impl_io_service_.post(deq_operation<Handler,Queue>(impl,post_io_service_,q,handler));
  }
private:
  // function object calling blocking deq() on queue
  template <typename Handler,typename Queue>
  class deq_operation{
  public:
    // ctor
    deq_operation(std::shared_ptr<queue_listener_impl>impl,boost::asio::io_service&io_service,std::shared_ptr<Queue>q,Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),handler_(handler) {
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_listener_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do blocking call on queue, otherwise post aborted message
      if(impl){
        std::pair<bool,typename Queue::value_type>ret{q_->deq()};
        boost::system::error_code ec=(!ret.first?boost::asio::error::operation_aborted:boost::system::error_code());
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec,ret.second));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,boost::asio::error::operation_aborted,typename Queue::value_type()));
      }
    }
  private:
    std::weak_ptr<queue_listener_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    std::shared_ptr<Queue>q_;
    Handler handler_;
  };
  // private data
  boost::asio::io_service impl_io_service_;
  std::unique_ptr<boost::asio::io_service::work>impl_work_;
  std::thread impl_thread_;
  boost::asio::io_service&post_io_service_;
};
}

queue_sender.h

#ifndef __QUEUE_SENDER_H__
#define __QUEUE_SENDER_H__
#include "simple_queue.h"
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>
namespace boost{
namespace asio{

// forward decl
class queue_sender_impl;
template<typename Impl=queue_sender_impl>class basic_queue_sender_service;

// --- IO Object (used by client) -----------------------------
template<typename Service,typename Queue>
class basic_queue_sender:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_sender(boost::asio::io_service&io_service,std::shared_ptr<Queue>q):
      boost::asio::basic_io_object<Service>(io_service),q_(q){
  }
  // async enq operation
  template <typename Handler>
  void async_enq(typename Queue::value_type val,Handler handler) {
    this->service.async_enq(this->implementation,q_,val,handler);
  }
private:
  std::shared_ptr<Queue>q_;
};
// typedef for using standard service object
template<typename T>
using simple_queue_sender=basic_queue_sender<basic_queue_sender_service<>,simple_queue<T>>;

// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_sender_service:public boost::asio::io_service::service{
public:
  // required to have id of service
 static boost::asio::io_service::id id;

  // ctor
  explicit basic_queue_sender_service(boost::asio::io_service&io_service):
      boost::asio::io_service::service(io_service){
  }
  // dtor
  ~basic_queue_sender_service(){
  }
  // get a typedef  for implementation
  using implementation_type=std::shared_ptr<Impl>;

  // mandatory (construct an implementation object)
  void construct(implementation_type&impl){
      impl.reset(new Impl(this->get_io_service()));
  }
  // mandatory (destroy an implementation object)
  void destroy(implementation_type&impl){
    impl.reset();
  }
  // async sync enq operation
  template <typename Handler,typename Queue>
  void async_enq(implementation_type&impl,std::shared_ptr<Queue>q,typename Queue::value_type val,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_enq(impl,q,val,handler);
  }
private:
  // shutdown service (required)
  void shutdown_service(){
  }
};
// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_sender_service<Impl>::id;

// --- implementation -----------------------------
class queue_sender_impl{
public:
  // ctor (set up work queue for io_service so we don't bail out when executing run())
  queue_sender_impl(boost::asio::io_service&post_io_service):
      impl_work_(new boost::asio::io_service::work(impl_io_service_)),
      impl_thread_([&](){impl_io_service_.run();}),
      post_io_service_(post_io_service){
  }
  // dtor (clear work queue, stop io service and join thread)
  ~queue_sender_impl(){
    impl_work_.reset(nullptr);
    impl_io_service_.stop();
    if(impl_thread_.joinable())impl_thread_.join();
  }
public:
  // enque message (post request to thread)
  template<typename Handler,typename Queue>
  void async_enq(std::shared_ptr<queue_sender_impl>impl,std::shared_ptr<Queue>tq,typename Queue::value_type val,Handler handler){
    impl_io_service_.post(enq_operation<Handler,Queue>(impl,post_io_service_,tq,val,handler));
  }
private:
  // function object calling blocking enq() on queue
  template <typename Handler,typename Queue>
  class enq_operation{
  public:
    // ctor
    enq_operation(std::shared_ptr<queue_sender_impl>impl,boost::asio::io_service &io_service,std::shared_ptr<Queue>tq,typename Queue::value_type val,Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),tq_(tq),val_(val),handler_(handler) {
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_sender_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do (potentially) blocking call on queue, otherwise post aborted message
      if(impl){
        bool ret{tq_->enq(val_)};
        boost::system::error_code ec=(!ret?boost::asio::error::operation_aborted:boost::system::error_code());
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,boost::asio::error::operation_aborted));
      }
    }
  private:
    std::weak_ptr<queue_sender_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    std::shared_ptr<Queue>tq_;
    typename Queue::value_type val_;
    Handler handler_;
  };
  // private data
  boost::asio::io_service impl_io_service_;
  std::unique_ptr<boost::asio::io_service::work>impl_work_;
  std::thread impl_thread_;
  boost::asio::io_service&post_io_service_;
};
}
}
#endif

7 comments:

  1. good idea - makes it easier to integrate byte streams with queues containing objects (am already experimenting!). you might want to think about consolidating the listener and sender into a single queue object - there is too much duplicate code across the sender/listener - keep at it and post updates, MP

    ReplyDelete
  2. Glad that you are testing the code! Sooner or later I'll evaluate if I should wrap up the asio queue functionality in a single class and I agree with you that there is too much duplicate 'boiler plate code. The reason I ended coding two IO object was mostly because of the history behind a project ... don't have much time right now to analyse and potentially change the code ... maybe later on I'll change it and post the updates.
    Thx for the comment,
    Hans

    ReplyDelete
  3. Really appreciate this wonderful post that you have provided for us.Great site and a great topic as well i really get amazed to read this. Its really good. Boost Sales

    ReplyDelete
  4. Thanks for the post. I am just trying to do the exact same thing. Your post is really a good start for me. One thing that is not that clear to me is if we use boost::asio::yield_context with this queue instead of the current function binder, is this becoming a coroutine so that we can use boost::asio:: spawn to run a goroutine like stuff consuming this queue in a while true loop?

    ReplyDelete
  5. Thanks for the post. I am trying to build an exactly same stuff and this post is really a good start for me. One thing I would like to try out and understand is in your async_deq function, if I pass in a boost::asio::yield_context instead of a function object, do I get a coroutine? So that I can do a while true loop in this coroutine, and the async queue performs exactly like the go channel.
    Thanks,
    Chao

    ReplyDelete
  6. Sorry for the late reply.
    Unfortunately, I have not (yet) used coroutines with boost asio so I can’t really answer your question.
    I would be interested in if you did get something working though …!

    ReplyDelete
    Replies
    1. Additionally, be aware of that there are some issues in the code.

      For example, using raw pointers might not be the best way of coding some of the logic. Also, as pointed out in https://stackoverflow.com/questions/60031286/what-is-the-problem-with-the-function-template-in-the-following-code there might be a mix-up of std::bind and boost::bind – I haven’t looked into it.

      In addition, the code was written in 2014 when C++17 and C++20 was not available – so certain things could be done in a more elegant way now.

      Maybe at some point I will get back to it and clean-up and modernize the code a little …

      Delete