Tuesday, August 12, 2014

Being strategically lazy

In general, we mean by any concept nothing more than a set of operations; the concept is synonymous with a corresponding set of operations. … It is evident that if we adopt this point of view towards concepts, namely that the proper definition of a concept is not in terms of its properties but in terms of actual operations, we need run no danger of having to revise our attitude toward nature. … For of course the true meaning of a term is to be found by observing what a man does with it, not by what he says about it.

-- Percy Bridgman, The Logic of Modern Physics, paper

Having had time to ponder the world of IT over the rainy summer days in Luxembourg it's time for a non-technical blog entry. The entry is about amortized productivity, or more specifically, about being strategically lazy in order to be more effective over the medium to long term.

I am lazy

I have to admit it - I'm a lazy person. If I can automate automation (meta automation?) I'm all for it especially when it comes to developing IT products. There is a crux though; some work has to be put into the part that automates automation. As long as the work in automating the automation outweighs the work that would have to be put into manually creating an automated product it's a win situation.

This blog entry is about automating automation, a concept I call being strategically lazy. By that I mean that in the long run I benefit in putting in what might look like useless work (it has no functional benefit) so that I can reap the benefit later on. It is done in all types of industries. Unfortunately there is a clear lack of it - with a few exceptions - in the IT industry. It developer and manager use tools like compilers, linkers, IDEs and other industry accepted tools, but they seldom develop their own tools to automate automation.

Being strategically lazy

It is baffling how often IT managers and developers apply different standards to their own work methods than the standards which customers of products will apply in their work when using a finished IT product.

Everyone knows that (one of) the purposes of IT is to automate tasks. That is, IT makes life simple and easy for someone out there in the real world because tasks are being done automatically. For this to happen someone must have put in work, usually lots of work, so that the tasks can be automatically executed.

The users of IT systems are smart since they realize that to be productive they have to be strategically lazy. That is, they can't be lazy in the short term (they have to make money to buy the IT system) but in the long term, ones they have an IT system they can sit back and watch the work being done automatically.

In the world of IT things are often different. In IT shops it is not uncommon to manually create many products which any non-IT person would simply assume should be done automatically by IT if someone just had the skillset to create an IT system. Managers cringe when developers spend time developing non-functional tools that helps them automate tasks. However, managers are not the only problem. Many developers will go straight for the functionality without even blinking when a smarter approach would have been to develop some scaffolding before dropping the first piece of code on the disk.

Being lazy is a good thing and eating your own dog food – here using IT when developing IT – is also good as long as there is a benefit to it. The idea that being lazy in a smart way and using IT for being lazy when developing IT systems should help in working less and producing more.

Being lazy when writing documents

The first activity that comes to mind where as an IT developer I would want to be lazy is of course the task of writing document. IT departments are notorious for pumping out documents at an ever increasing rate. Here I mean manually written documents that take both time and effort to write and which are often out of date when they hit the printer tray.

When really thinking about it, it's astounding that documents are not automatically produced in IT departments. There is no magic in doing so. All it takes is to develop models which are programmatically accessible. As soon as a model is implemented and can be accessed programmatically, an almost innumerable number of views of the model can be created and mapped to text and diagrams.

Let's take a concrete example. The description of a large architecture containing systems, interfaces and processes among other things can be described using hand written Power Point documents, which by the way is most often the case. It can also be described using a programmatically accessible model– typically realized in a relational database.

Yes, it takes a little more effort to do so but ones it has been done it can be extended little by little without having to manually update and rewrite documents. And by the way, we are IT developers so it really shouldn't be a big deal. Once a model has been built, documents can be automatically generated by mapping projections of the model onto text and diagrams.

Being lazy when developing larger IT system

Another example where IT developers are in generally not lazy enough is in the area of managing execution of a number of tasks. Yes, I know that many IT shops use cron or write scripts to execute a number of sequential tasks. But what happens when more than one chain of tasks must be executed, or when the chain is not a simple chain but a DAG of tasks? This type of execution is usually described as a business process.

The problem here is that most IT shops do not lay down the fundamental ground work for implementing business processes and as a consequence end up with hundreds or thousands of scripts, each executing their own business process. Sometimes even worse, they go out and buy an expensive business process engine framework (usually based on BPMN/BPEL which in 98% of the cases is a complete overkill) requiring employess to take courses to make it perform even the most rudimentary operations.

To be fair I don't lay the entire blame on IT developers. The blame should to a large extent be laid on IT managers. Most often, managers don't understand the virtue of being strategically lazy.

Being lazy when monitoring IT systems

Monitoring of IT systems usually do not add much functional value to a system. As most developers know, not having automatic monitoring of an IT system can be frustrating and result in having to do repetitious tasks like greping log files every so often.

The lazy approach to functional monitoring is of course to inject probes at various points in the system where information is logged in some structured storage like a database. Ones the information generation machinery is in place it is simple to write tools that runs checks and produces reports on the structured information.

The idea of logging structured information which is programmatically accessible makes it simple to generate a large number of projections showing how the system behaves. And by the way, it is nothing revolutionary. There is no reason to settle for unstructured log files from which it is excruciating difficult to understand the behavior of the system as a whole. Again, the key is to have the ability to generate projections from structured data in order to understand the behavior of a system better.

Sounds obvious, doesn't it? But again, I have seen many IT shops where developers, functional analysts etc. spend an hour or so every morning checking for potential problems in log files. Again, the solution is to be strategically lazy and apply IT knowledge to our own work.

Conclusion

Being strategically lazy is important in almost any area. However, IT by its very nature is in a very special position since it is in the business of automation. Because of the inherit knowledge of automation within IT shops together with the knowledge of IT I believe it is important that both developers as well as managers explicitly acknowledge laziness as a tool for being productive. Being lazy does not necessarily mean that you are productive - but not being lazy certainly means that you are not productive in an efficient way.

By the way,C++ meta programming comes across as an excellent path towards Strategical Laziness!

Thursday, August 7, 2014

boost::asio Queues Revamped - Part II

As I get older and crankier, I find myself more and more exasperated with the great inflexible sets of rules that many companies try to pour into concrete and sanctify as methods. The idea that a single method should govern even two different projects is highly suspect: the differences between projects are much more important than the similarities.

-- Tom DeMarco, Controlling Software Projects

Introduction

This entry is short and contains a few modifications to the previous entry together with some new features. Since most of the interesting things are in the code, this entry is almost all code with only a few lines of text interspersed.

Updates to boost::asio queues

In my previous blog entry Using queues with boost::asio - Part I I made a few mistakes which I will correct here. I will also add an asynchronous queue_sender which blocks when the queue is full and invokes a handler when it has inserted an item in the queue. Additionally I changed the API slighly so that a queue_listener (and also the new queue_sender) takes the queue as a constructor parameter

Problems, mistakes and other idiosyncrasies

I try to publish what I believe is interesting and usefull code as soon as I have used it in one or more designs or projects. Even though the code in my blog entries are rather small, I do get comments from collegues and via email pointing out mistakes, things that could have been done better or been done in a different way. I also do get emails where developers have used the code in their own projects!

I'm fully aware of that since the code has not been reviewed by many people, there are mistakes and there are things that could have been done better.

In case you notice mistakes, bugs, or code that are simply incorrect, please feel free to contact me at hansewetz@hotmail.com.

Mistakes in my previous design

In the previous design it was possible to call async_deq passing a queue followed by one or more calls passing in the same or another queue. Since the implementation object runs a single thread, the requests would be queued in the io_service in the implementation object. As a consequence, if async_deq was called on q1 and then called on q2, de-queuing on q2 could be waiting forever if there were no messages in q1.

This problem is corrected by having each IO object handle a single queue which is passed as a parameter in the constructor. The downside is that it is no longer possible to use the same IO object to simultaneously listen on different queues.

I could have kept the original design by managing multiple threads in the implementation object at the cost of a more complex design. The current design is simple and easy to understand so I'll stick with it

New features and an example

The main new feature in the design is a queue_sender which blocks when the queue is full. The underlying queue takes a parameter specifying the maximum number of items the queue can hold. Here is an example:

#include "queue_sender.h"
#include <boost/asio.hpp>
#include <boost/log/trivial.hpp>
#include <string>
#include <memory>
#include <chrono>
using namespace std;
using namespace std::placeholders;

// asio queue stuff
shared_ptr<boost::asio::simple_queue<int>>q{new boost::asio::simple_queue<int>(3)};
boost::asio::io_service ios;
boost::asio::simple_queue_sender<int>qsender(::ios,q);

// handler for queue sender
template<typename T>
void qsender_handler(boost::system::error_code const&ec,T item){
  BOOST_LOG_TRIVIAL(debug)<<"sending item in qlistener_handler (via asio), item: "<<item<<", ec: "<<ec;
  qsender.async_enq(item+1,std::bind(qsender_handler<T>,_1,item+1));
  std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
// test program
int main(){
  try{
    // start sending messages
    int startItem{1};
    qsender.async_enq(startItem,std::bind(qsender_handler<int>,_1,startItem));

    // kick off io service
    ::ios.run();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(error)<<"cought exception: "<<e.what();
  }
}

For brevity I've skipped error checking. Specifically I'm not checking the ec error code from boost::asio.

The sampel code above will insert exactly 3 integers (1, 2, 3) into the queue after which the async_deque blocks on the queue waiting for the queue to have items removed. As soon as an item is removed, the qsender_handler handler is invoked. In this example there is no code de-queuing messages so the run method will never return.

A more realistic example

A slighly more realistic example is one where a queue sender removes messages from the queue:

#include "queue_listener.h"
#include "queue_sender.h"
#include <boost/asio.hpp>
#include <boost/log/trivial.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <string>
#include <memory>
#include <thread>
using namespace std;
using namespace std::placeholders;

// asio queue stuff
shared_ptr<boost::asio::simple_queue<int>>q{new boost::asio::simple_queue<int>(3)};
boost::asio::io_service ios;
boost::asio::simple_queue_listener<int>qlistener(::ios,q);
boost::asio::simple_queue_sender<int>qsender(::ios,q);

// max #of messages to send/receive
constexpr size_t maxmsg{10};

// handler for queue listener
template<typename T>
void qlistener_handler(boost::system::error_code const&ec,T item, int nreceived){
  // print item if error code is OK
  BOOST_LOG_TRIVIAL(debug)<<"received item in qlistener_handler (via asio), item: "<<item<<", ec: "<<ec;
  if(nreceived+1<maxmsg)qlistener.async_deq(std::bind(qlistener_handler<T>,_1,_2,nreceived+1));
}
// handler for queue sender
template<typename T>
void qsender_handler(boost::system::error_code const&ec,int item,int nsent){
  BOOST_LOG_TRIVIAL(debug)<<"sending item in qlistener_handler (via asio), item: "<<item<<", ec: "<<ec<<", queue size: "<<q->size();
  if(nsent+1<maxmsg)qsender.async_enq(item+1,std::bind(qsender_handler<T>,_1,item+1,nsent+1));
}
// test program
int main(){
  try{
    // setup timer to trigger in 3 seconds after which we start listening to messages
    boost::asio::deadline_timer tmo(::ios,boost::posix_time::seconds(1));
    tmo.async_wait([&](const boost::system::error_code&ec){qlistener.async_deq(std::bind(qlistener_handler<int>,_1,_2,0));});

    // start sending messages
    int startItem{1};
    qsender.async_enq(startItem,std::bind(qsender_handler<int>,_1,startItem,0));

    // kick off io service
    ::ios.run();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(error)<<"cought exception: "<<e.what();
  }
}

The program first sets a deadline_timer which when popping starts a queue_listener. This is so that the queue_sender will have a chance to fill up the queue and start blocking. Ones the timer pops the queue_listener removes items form the queue and the queue_sender invokes the qlistener_handler handler which will insert more messages in the queue.

In the example,the handler functions receives the number of messages processed so far. Ones the number of messages reaches maxmsg the queue functions are not reloaded and we'll exit the boost::asio::run function.

The output looks like:

sending item in qlistener_handler (via asio), item: 1, ec: system:0, queue size: 1
sending item in qlistener_handler (via asio), item: 2, ec: system:0, queue size: 2
sending item in qlistener_handler (via asio), item: 3, ec: system:0, queue size: 3
received item in qlistener_handler (via asio), item: 1, ec: system:0
received item in qlistener_handler (via asio), item: 2, ec: system:0
received item in qlistener_handler (via asio), item: 3, ec: system:0
sending item in qlistener_handler (via asio), item: 4, ec: system:0, queue size: 1
sending item in qlistener_handler (via asio), item: 5, ec: system:0, queue size: 2
sending item in qlistener_handler (via asio), item: 6, ec: system:0, queue size: 3
received item in qlistener_handler (via asio), item: 4, ec: system:0 
received item in qlistener_handler (via asio), item: 5, ec: system:0 
received item in qlistener_handler (via asio), item: 6, ec: system:0 
sending item in qlistener_handler (via asio), item: 7, ec: system:0, queue size: 1
sending item in qlistener_handler (via asio), item: 8, ec: system:0, queue size: 2
sending item in qlistener_handler (via asio), item: 9, ec: system:0, queue size: 3
received item in qlistener_handler (via asio), item: 7, ec: system:0 
received item in qlistener_handler (via asio), item: 8, ec: system:0 
received item in qlistener_handler (via asio), item: 9, ec: system:0 
sending item in qlistener_handler (via asio), item: 10, ec: system:0, queue size: 1
received item in qlistener_handler (via asio), item: 10, ec: system:0

Notice that the three first messages are inserted immediately. After the first three messages the async_enq call blocks. Once a message is de-queued, the qsender_handler is invoked which inserts a new message into the queue.

The feature of blocking when the queue is full and invokinga callback handler ones it is possible to insert more messages is useful for applications where throttling is important.

Design decisions

There are many ways the design (and implementation) of a boost::asio queue based IO object(s) could have been done. I choose to separate en-queuing and de-queuing into a queue_listener and a queue_sender mostly because there is no obvious reason why both en-queuing and de-queuing has to be done through boost::asio. The underlying queue can be used by any client code and is a generic queue not tied directly to boost::asio even though it must satisfy certain characteristics to work properly with boost::asio.

Alternativly, a single IO object, a queue could have been implemented suporting both async_enq and async_deq. Time will tell if I took the righ decision. If it turns out that having a single IO objcet is simpler, I'll change the design.M/p>

Another design decision I took was to handle only one single queue in an IO object. The reason for this was for simplicity in both design and coding. Potentially it could be to beneficial support listening on multiple queues inside a single IO object. However, I will leave this as an exercise for the future when I have more time on my hands.

Yet another example

The example below illustrates how to read from a file descriptor (or socket), process the data, insert it into a queue and finally read it from the queue - all done as asynchronous operations.

The code creates a posix asio file descriptor reading from stdin. It also creates a queue_sender and a queue_listener. Each line is read (asynchronously) from the fie descriptor, then asynchronously inserted into the queue using a queue_sender. At the same time a queue_listener reads messages asynchronously form the queue and prints them to the screen.

Admittedly, the application is not very useful - however, it does show how asio IO objects can cooperate without explicitly using threads at the application level.

Here is the code:

#include "queue_listener.h"
#include "queue_sender.h"
#include <boost/asio.hpp>
#include <boost/asio/posix/basic_descriptor.hpp>
#include <boost/log/trivial.hpp>
#include <string>
#include <memory>
using namespace std;
using namespace std::placeholders;

// asio io_service
boost::asio::io_service ios;

// variables enabling us to know when we are done
atomic<size_t>nlinesRemaining{0};
atomic<bool>done{false};

// queue stuff
shared_ptr<boost::asio::simple_queue<string>>q{new boost::asio::simple_queue<string>(3)};
boost::asio::simple_queue_listener<string>qlistener(::ios,q);
boost::asio::simple_queue_sender<string>qsender(::ios,q);

// posix stream
boost::asio::posix::stream_descriptor fd_read(::ios,0);
boost::asio::streambuf fd_data;

// handler for queue listener
template<typename T>
void qlistener_handler(boost::system::error_code const&ec,T item){
  if(!ec){
    BOOST_LOG_TRIVIAL(debug)<<"received item in queue: \""<<item<<"\"";
    qlistener.async_deq(std::bind(qlistener_handler<T>,_1,_2));
    --nlinesRemaining;
  }else{
    // ignore errors for right now
    // ...
  }
  // check if we should shut down queues
  if(done&&nlinesRemaining==0){
    q->disable_deq(true);
    q->disable_enq(true);
  }
}
// handler for fd
void fd_handler(boost::system::error_code ec,size_t size){
  if(ec.value()==boost::asio::error::eof&&size==0){
    // we are done  - but can't shut down queues until they are empty
    done=true;
    fd_read.cancel();
    fd_read.close();
  }else
  if(ec.value()){
    // ignore errors for right now
    // ...
  }else{
    // get read line and enque it asynchronously into the queue
    istream is(&fd_data);
    string item;
    getline(is,item);
    ++nlinesRemaining;
    qsender.async_enq(item,[&](boost::system::error_code const&ec){});

    // continue reading from fd
    boost::asio::async_read_until(fd_read,fd_data,'\n',std::bind(fd_handler,_1,_2));
  }
}
// test program
int main(){
  try{
    // read line by line from stdin
    boost::asio::async_read_until(fd_read,fd_data,'\n',std::bind(fd_handler,_1,_2));

    // start listening to messages from queue
    qlistener.async_deq(std::bind(qlistener_handler<string>,_1,_2));

    // run asio 
    ::ios.run();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(error)<<"cought exception: "<<e.what();
  }
}

The program can be tested as follows:

cat Makefile | test2

The result would look like:

received item in queue: "all:"
received item in queue: "   $(MAKE) -f Makefile.test1"
received item in queue: "   $(MAKE) -f Makefile.test2"
received item in queue: ""
received item in queue: "clean:"
received item in queue: "   $(MAKE) -f Makefile.test1 clean" 
received item in queue: "   $(MAKE) -f Makefile.test2 clean" 

A thing to take note of is that we must be careful to terminate the boost::asio::run function correctly. Here I use two variable, nlinesRemaining tracking number of items remaining to pull out of the queue. The other variable, done is set when there is nothing more to read.

Conclusions

Even though boost::asio is mostly focused on streams of bytes, I can't see a good reason for not supporting basic data structures such as queues in an asynchronous boost::asio environment. A typical use case is the reception of messages from a queue which are serialized through a socket as bytes. Inversely, receiving bytes through a socket which are packed into messages and sent trought a queue is also a typical use case. In the latter case, the ability to block in case the queue is getting full can become important in some environment.

Notice that blocking does not refer to blocking the boost::asio::run event loop, but rather blocking on some internal queue which is invisible to the client code.

No doubt there will be changes to the boost::asio queue support ones being seriously used. When changes are made I'll post them on this blog.

The full code

The implementation of the queue_listener is simple as is the implementation of the queue_sender. The full code for boost::asio queue support together with the default simple_queue is shown here:

simple_queue.h

#ifndef __SIMPLE_QUEUE_H__
#define __SIMPLE_QUEUE_H__
#include <utility>
#include <queue>
#include <mutex>
#include <condition_variable>
namespace boost{
namespace asio{

// a simple thread safe queue used as default queue in boost::asio::queue_listener
template<typename T,typename Container=std::queue<T>>
class simple_queue{
public:
  // typedef for value stored in queue
  // (need this so we can create an item with default ctor)
  using value_type=T;

  // ctors,assign,dtor
  simple_queue(std::size_t maxsize):maxsize_(maxsize){}
  simple_queue(simple_queue const&)=delete;
  simple_queue(simple_queue&&)=default;
  simple_queue&operator=(simple_queue const&)=delete;
  simple_queue&operator=(simple_queue&&)=default;
  ~simple_queue()=default;

  // put a message into queue
  bool enq(T t){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !enq_enabled_||q_.size()<maxsize_;});
    if(!enq_enabled_)return false;
    q_.push(t);
    cond_.notify_all();
    return true;
  }
  // dequeue a message (return.first == false if deq() was disabled)
  std::pair<bool,T>deq(){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !deq_enabled_||!q_.empty();});

    // if deq is disabled or queue is empty return 
    if(!deq_enabled_||q_.empty()){
      return std::make_pair(false,T{});
    }
    // check if we have a message
    std::pair<bool,T>ret{std::make_pair(true,q_.front())};
    q_.pop();
    cond_.notify_all();
    return ret;
  }
  // cancel deq operations (will also release blocking threads)
  void disable_deq(bool disable){
    std::unique_lock<std::mutex>lock(mtx_);
    deq_enabled_=!disable;
    cond_.notify_all();
  }
  // cancel enq operations (will also release blocking threads)
  void disable_enq(bool disable){
    std::unique_lock<std::mutex>lock(mtx_);
    enq_enabled_=!disable;
    cond_.notify_all();
  }
  // set max size of queue
  void set_maxsize(std::size_t maxsize){
    std::unique_lock<std::mutex>lock(mtx_);
    maxsize_=maxsize;
    cond_.notify_all();
  }
  // check if queue is empty
  bool empty()const{
    std::unique_lock<std::mutex>lock(mtx_);
    return q_.empty();
  }
  // get #of items in queue
  std::size_t size()const{
    std::unique_lock<std::mutex>lock;
    return q_.size();
  }
  // get max items in queue
  std::size_t maxsize()const{
    std::unique_lock<std::mutex>lock(mtx_);
    return maxsize_;
  }
private:
  std::size_t maxsize_;
  mutable std::mutex mtx_;
  mutable std::condition_variable cond_;
  bool deq_enabled_=true;
  bool enq_enabled_=true;
  Container q_;
};
}
}
#endif

queue_listener.h

#ifndef __QUEUE_LISTENER_H__
#define __QUEUE_LISTENER_H__
#include "simple_queue.h"
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>
namespace boost{
namespace asio{

// forward decl
class queue_listener_impl;
template<typename Impl=queue_listener_impl>class basic_queue_listener_service;

// --- IO Object (used by client) -----------------------------
template<typename Service,typename Queue>
class basic_queue_listener:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_listener(boost::asio::io_service&io_service,std::shared_ptr<Queue>q):
      boost::asio::basic_io_object<Service>(io_service),q_(q){
  }
  // async deq operation
  template <typename Handler>
  void async_deq(Handler handler) {
    this->service.async_deq(this->implementation,q_,handler);
  }
private:
  std::shared_ptr<Queue>q_;
};
// typedef for using standard service object
template<typename T>
using simple_queue_listener=basic_queue_listener<basic_queue_listener_service<>,simple_queue<T>>;

// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_listener_service:public boost::asio::io_service::service{
public:
  // required to have id of service
 static boost::asio::io_service::id id;

  // ctor
  explicit basic_queue_listener_service(boost::asio::io_service&io_service):
      boost::asio::io_service::service(io_service){
  }
  // dtor
  ~basic_queue_listener_service(){
  }
  // get a typedef  for implementation
  using implementation_type=std::shared_ptr<Impl>;

  // mandatory (construct an implementation object)
  void construct(implementation_type&impl){
      impl.reset(new Impl(this->get_io_service()));
  }
  // mandatory (destroy an implementation object)
  void destroy(implementation_type&impl){
    impl.reset();
  }
  // async sync deq operation
  template <typename Handler,typename Queue>
  void async_deq(implementation_type&impl,std::shared_ptr<Queue>q,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_deq(impl,q,handler);
  }
private:
  // shutdown service (required)
  void shutdown_service(){
  }
};
// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_listener_service<Impl>::id;

// --- implementation -----------------------------
class queue_listener_impl{
public:
  // ctor (set up work queue for io_service so we don't bail out when executing run())
  queue_listener_impl(boost::asio::io_service&post_io_service):
      impl_work_(new boost::asio::io_service::work(impl_io_service_)),
      impl_thread_([&](){impl_io_service_.run();}),
      post_io_service_(post_io_service){
  }
  // dtor (clear work queue, stop io service and join thread)
  ~queue_listener_impl(){
    impl_work_.reset(nullptr);
    impl_io_service_.stop();
    if(impl_thread_.joinable())impl_thread_.join();
  }
public:
  // deque message (post request to thread)
  template<typename Handler,typename Queue>
  void async_deq(std::shared_ptr<queue_listener_impl>impl,std::shared_ptr<Queue>q,Handler handler){
    impl_io_service_.post(deq_operation<Handler,Queue>(impl,post_io_service_,q,handler));
  }
private:
  // function object calling blocking deq() on queue
  template <typename Handler,typename Queue>
  class deq_operation{
  public:
    // ctor
    deq_operation(std::shared_ptr<queue_listener_impl>impl,boost::asio::io_service&io_service,std::shared_ptr<Queue>q,Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),handler_(handler) {
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_listener_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do blocking call on queue, otherwise post aborted message
      if(impl){
        std::pair<bool,typename Queue::value_type>ret{q_->deq()};
        boost::system::error_code ec=(!ret.first?boost::asio::error::operation_aborted:boost::system::error_code());
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec,ret.second));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,boost::asio::error::operation_aborted,typename Queue::value_type()));
      }
    }
  private:
    std::weak_ptr<queue_listener_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    std::shared_ptr<Queue>q_;
    Handler handler_;
  };
  // private data
  boost::asio::io_service impl_io_service_;
  std::unique_ptr<boost::asio::io_service::work>impl_work_;
  std::thread impl_thread_;
  boost::asio::io_service&post_io_service_;
};
}

queue_sender.h

#ifndef __QUEUE_SENDER_H__
#define __QUEUE_SENDER_H__
#include "simple_queue.h"
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>
namespace boost{
namespace asio{

// forward decl
class queue_sender_impl;
template<typename Impl=queue_sender_impl>class basic_queue_sender_service;

// --- IO Object (used by client) -----------------------------
template<typename Service,typename Queue>
class basic_queue_sender:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_sender(boost::asio::io_service&io_service,std::shared_ptr<Queue>q):
      boost::asio::basic_io_object<Service>(io_service),q_(q){
  }
  // async enq operation
  template <typename Handler>
  void async_enq(typename Queue::value_type val,Handler handler) {
    this->service.async_enq(this->implementation,q_,val,handler);
  }
private:
  std::shared_ptr<Queue>q_;
};
// typedef for using standard service object
template<typename T>
using simple_queue_sender=basic_queue_sender<basic_queue_sender_service<>,simple_queue<T>>;

// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_sender_service:public boost::asio::io_service::service{
public:
  // required to have id of service
 static boost::asio::io_service::id id;

  // ctor
  explicit basic_queue_sender_service(boost::asio::io_service&io_service):
      boost::asio::io_service::service(io_service){
  }
  // dtor
  ~basic_queue_sender_service(){
  }
  // get a typedef  for implementation
  using implementation_type=std::shared_ptr<Impl>;

  // mandatory (construct an implementation object)
  void construct(implementation_type&impl){
      impl.reset(new Impl(this->get_io_service()));
  }
  // mandatory (destroy an implementation object)
  void destroy(implementation_type&impl){
    impl.reset();
  }
  // async sync enq operation
  template <typename Handler,typename Queue>
  void async_enq(implementation_type&impl,std::shared_ptr<Queue>q,typename Queue::value_type val,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_enq(impl,q,val,handler);
  }
private:
  // shutdown service (required)
  void shutdown_service(){
  }
};
// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_sender_service<Impl>::id;

// --- implementation -----------------------------
class queue_sender_impl{
public:
  // ctor (set up work queue for io_service so we don't bail out when executing run())
  queue_sender_impl(boost::asio::io_service&post_io_service):
      impl_work_(new boost::asio::io_service::work(impl_io_service_)),
      impl_thread_([&](){impl_io_service_.run();}),
      post_io_service_(post_io_service){
  }
  // dtor (clear work queue, stop io service and join thread)
  ~queue_sender_impl(){
    impl_work_.reset(nullptr);
    impl_io_service_.stop();
    if(impl_thread_.joinable())impl_thread_.join();
  }
public:
  // enque message (post request to thread)
  template<typename Handler,typename Queue>
  void async_enq(std::shared_ptr<queue_sender_impl>impl,std::shared_ptr<Queue>tq,typename Queue::value_type val,Handler handler){
    impl_io_service_.post(enq_operation<Handler,Queue>(impl,post_io_service_,tq,val,handler));
  }
private:
  // function object calling blocking enq() on queue
  template <typename Handler,typename Queue>
  class enq_operation{
  public:
    // ctor
    enq_operation(std::shared_ptr<queue_sender_impl>impl,boost::asio::io_service &io_service,std::shared_ptr<Queue>tq,typename Queue::value_type val,Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),tq_(tq),val_(val),handler_(handler) {
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_sender_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do (potentially) blocking call on queue, otherwise post aborted message
      if(impl){
        bool ret{tq_->enq(val_)};
        boost::system::error_code ec=(!ret?boost::asio::error::operation_aborted:boost::system::error_code());
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,boost::asio::error::operation_aborted));
      }
    }
  private:
    std::weak_ptr<queue_sender_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    std::shared_ptr<Queue>tq_;
    typename Queue::value_type val_;
    Handler handler_;
  };
  // private data
  boost::asio::io_service impl_io_service_;
  std::unique_ptr<boost::asio::io_service::work>impl_work_;
  std::thread impl_thread_;
  boost::asio::io_service&post_io_service_;
};
}
}
#endif