Wednesday, November 19, 2014

boost::asio Queues Rediscovered - Part III

Complexity is proportional to the effort it takes to see that something is not correct

-- unknown author

Introduction

In the previous posts about queues and boost::asio I only used a single in-memory queue - the asio::simple_queue. In this blog entry I'll show that it does not require much effort to implement new types of queues which work seamlessly with the boost::asio::queue_listener and boost::asio::queue_sender described in previous posts.

After a few months hacking around with queues and asio it's become clear that the combination of the two is indispensable when quickly wiring up the plumbing of asynchronous messaging infrastructures. Even more, after adding support for other queue types such as directory based queues where queue items are stored in files (yes, many legacy systems do just that), it is now both simple and quick to re-write old outdated messaging infrastructures with transparent asynchronous messaging at the application level.

I'll give two example of queues. One where where queue items are stored as files in a directory and a second one where queues are used to send messages through file descriptors, specifically pipes.

In the future I can imagine a great many other types of queues. For example socket_queue, html_queue (would it not be great to get rid of those pesky httpd servers littering the IT landscape in small internal systems serving only the purpose of receiving messages and then returning results via ftp or some other messaging mechanism), webservice_queue and many others. As we speak some are already in progress, but unfortunately cannot be published yet.

I have made a few slight modifications to the code from the previous entries as you'll see. However, if you read the previous blog entries it should not be a problem in understanding what's going on.

Design changes from past posts

The classes used for listening on queues and sdning messages on queus have been changed. For listening on a queue the type is now queue_listener<queue_type> and queue_sender<queue_type> regardless of the type of queue.

One specific change was to remove the constraint that queues were passed by std::shared_ptr to senders and listeners. Now queues are simply passed by pointer.

The queue_listener<queue_type> class has the following interface:

template<typename Service,typename Queue>
class basic_queue_listener:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_listener(boost::asio::io_service&io_service,Queue*q);

  // ctor
  template <typename Handler>
  void async_deq(Handler handler);

  // async deq operation - timed
  template <typename Handler>
  void timed_async_deq(Handler handler,std::size_t ms);

  // wait until we can deq a message from queue in async mode
  template <typename Handler>
  void async_wait_deq(Handler handler);

  // wait until we can deq a message from queue in async mode - timed
  template <typename Handler>
  void timed_async_wait_deq(Handler handler,std::size_t ms);
 
  // sync deq operation (blocking)
  std::pair<bool,typename Queue::value_type>sync_deq(boost::system::error_code&ec);
};
// typedefs for using standard queue listeners
template<typename Queue>using queue_listener=basic_queue_listener<basic_queue_listener_service<>,Queue>;

The queue_sender<queue_type> class has the following interface:

template<typename Service,typename Queue>
class basic_queue_sender:public boost::asio::basic_io_object{
public:
  // ctor
  explicit basic_queue_sender(boost::asio::io_service&io_service,Queue*q):
      boost::asio::basic_io_object(io_service),q_(q){
  }
  // async enq operation
  template 
  void async_enq(typename Queue::value_type val,Handler handler);
  
  // async enq operation - timed
  template 
  void timed_async_enq(typename Queue::value_type val,Handler handler,std::size_t ms);

  // wait until we can put a message in queue in async mode
  template 
  void async_wait_enq(Handler handler);

  // wait until we can put a message in queue in async mode - timed
  template 
  void timed_async_wait_enq(Handler handler,std::size_t ms);

  // sync enq operation (blocking)
  void sync_enq(typename Queue::value_type val,boost::system::error_code&ec);
};
// typedefs for using standard queue listeners
template<typename Queue>using queue_sender=basic_queue_sender<basic_queue_sender_service<>,Queue>;

Because the interface now supports timed operations a few more requirements will be put on the queues.

Timed operations

The new listener and sender interface support timed operations. I.e., an enq or deq operation can be started with a timeout. When it is not possible to enq or deq a message the boost::asio error code is set to boost::asio::error::timed_out in the callback function.

If a queue already supports cancellation of an operation and is thread safe, a timed operation is simply a convenience. It would also be possible to set-up a boost::asio::deadline_timer and cancel the ongoing operation from the timer callback directly on the queue. Having done this a number of times it became annoying so the timed operations were added.

A point to notice about timed operations is that the time-out refers to the time until a message can be enqueued or dequeued. Ones a dequeue or an enqueue has started the timer stops running.

As a consequence, a message being read could potentially hang forever if no more data arrives through the queue. This could be seen as a design flaw and I might try to correct it at some point by using a timeout for inter messages and a second timeout for intra messages.

There is a problem having a timer for intra messages since it could potentially leave the queue in an inconsistent state. In any case, this needs to be analysed to see what solutions if any are possible. Clearly, if the queues were transactional the situation would be different. However, the asio queues were never meant to be used in a transactional setting and were therefore not designed for it.

Flow control

When plumbing components together with queues the issue of flow control will inevitable show up. Queues usually have a limited capacity either for pure physical reasons, for technical design reasons and sometimes for functional reasons. In either case, it must be possible to control the flow of messages upstream when queues are blocking downstream.

Flow controll is supported by methods on the queue_listener and the queue_sender. The methods are async_wait_deq, timed_async_wait_deq, async_wait_enq and timed_async_wait_enq. The timed methods have callbacks returning the error code boost::asio::error::timed_out in case of a time-out. The other methods returns eithre an OK error code or a boost::asio::error::operation_aborted code in case some other thread disabled deq or enq operations directory on the queue.

Exactly how the underlying queue decides when a message can be enqueued or dequeued is up to the queue implementation. Some of the queues directly tracks the number of messages in the queue and uses mutex and condition variables to implement the required operations to support wait type operations. The fd type queues uses a select loop to manage when a message can be enqueued or dequeued.

Here is a snippet of code showing how to wait until we know it is possible to enqueue a message into a queue. Notice that the callback may eventualy be called with a timeout error code:

// ...
    
// handler for waiting for starting sending messages
void qsender_waiter_handler(boost::system::error_code const&ec,asio::queue_sender<enq_t>*qs){
  if(ec!=0){
    BOOST_LOG_TRIVIAL(debug)<<"enq-wait() aborted (via asio), ec: "<<ec.message();
  }else{
    // we know we can send a message now
    // ...
  }
}   

    // ...
    
    // listener
    asio::queue_sender<enq_t>qsender(::ios,&qin);
    
    // wait tmo_enq_ms ms until we can send a message
    qsender.timed_async_wait_enq(std::bind(qsender_waiter_handler,_1,&qsender),tmo_enq_ms);

    // ...

    BOOST_LOG_TRIVIAL(debug)<<"starting asio ...";
    ::ios.run();
    
// ...

The boost::asio::polldir_queue

The name polldir_queue is unfortunate since the queue does not poll a directory. Let's just say the name never changed for historical reasons.

The queue is a simple queue storing messages as files in a directory. A user supplies a serialiser and a deserialiser when messages are moved between file and memory. Exactly when the serialisation is done (as well as de-serialisation) is transparent to the user - only the functions needs to be supplied

The queue is thread safe through the use of boost::interprocess::named_mutex and boost::interprocess::named_condition. However, the restriction is clearly that the processes accessing the queue must run on the same machine. The directory where messages, i.e. files, are stored can be on a shared drive though.

The implementation is relatively straight forward even though it is tedious. There are a number of design decisions that must be taken that either limits the queues useabiity (and makes the code simpler) or make the queue more usable (which make the code more complicated).

One decision I took was not to use inotify but instead rely on caching of file names in a directory storing the queued messages and then get notified through the boost interprocess mutex and conditional variable mechanism. Either design choice has its advantages and disadvantages. For example, inotify will not work correctly on shared file systems. However, it would not take much effort to implement a queue based on inotify if it would provide features not supported by the current implementation. This is probably an exercise I'll undertake in the not so far future.

Reading the code examples

If you read the code example below, they may seem kind of long and unwieldy. I didn't cut out bits and pieces of code so just to show the principles since it sometimes makes it hard to follow the flow of the logic in a program. Since the code is based on call-backs, the best way to start is to read the main program and trace the code upwards from there.

An example using an boost::asio::polldir_queue

Below is a small example showing how the queue can be used. I'll give the entire blob of code directly since it it is straight forward to read.

The code creates two queue, both referring to the same directory, a queue sender and a queue listener (we don't really need two queues here - it's only to illustrate how the queues work). The main program spawns a thread which inserts messages synchronously into the queue using the queue sender.

The queue listener waits for messages asynchronously with a time-out. When it picks up a message it re-triggers listening on messages. If the time-out is reached, it does not continue to listen for messages and the asio loop is exited.

Here is the code:

#include <boost/asio_queue.hpp>
#include <boost/asio.hpp>
#include <boost/log/trivial.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/filesystem.hpp>
#include <string>
#include <memory>
#include <thread>
#include <functional>
#include <iostream>
using namespace std;
using namespace std::placeholders;

namespace asio= boost::asio;
namespace fs=boost::filesystem;

// test queue item
class Message{
public:
  friend ostream&operator<<(ostream&os,Message const&junk){
    os<<junk.s_<<" "<<junk.i_;
  }
  friend istream&operator>>(istream&is,Message&junk){
    is>>junk.s_>>junk.i_;
  }
  Message()=default;
  Message(string const&s,int i):s_(s),i_(i){}
  Message(Message const&)=default;
  Message(Message&&)=default;
  Message&operator=(Message const&)=default;
  Message&operator=(Message&&)=default;
private:
  string s_;
  int i_;
};
// timeout for deq()
size_t tmo_ms{3000};

// value type in queues
// (must work with operator<< and operator>>, and be default constructable)
using qval_t=Message;

// create sender and listener queues (could be the same queue)
function<qval_t(istream&)>deserialiser=[](istream&is){qval_t ret;is>>ret;return ret;};
function<void(ostream&,qval_t const&)>serialiser=[](ostream&os,qval_t const&i){os<<i;};
string qname{"q1"};
fs::path qdir{"./q1"};
using queue_t=asio::polldir_queue<qval_t,decltype(deserialiser),decltype(serialiser)>;
queue_t qrecv{qname,0,qdir,deserialiser,serialiser,true};
queue_t qsend{qname,0,qdir,deserialiser,serialiser,true};

// setup asio object
asio::io_service ios;
asio::queue_listener<queue_t>qlistener(::ios,&qrecv);
asio::queue_sender<queue_t>qsender(::ios,&qsend);

// max #of messages to send/receive
constexpr size_t maxmsg{3};

// handler for queue listener
void qlistener_handler(boost::system::error_code const&ec,qval_t const&item){
  if(ec!=0){
    BOOST_LOG_TRIVIAL(debug)<<"deque() aborted (via asio), ec: "<<ec.message();
  }else{
    BOOST_LOG_TRIVIAL(debug)<<"received item in qlistener_handler (via asio), item: "<<item<<", ec: "<<ec;
    qlistener.timed_async_deq(qlistener_handler,tmo_ms);
  }
}
// thread function sending maxmsg messages
void thr_send_sync_messages(){
  for(int i=0;i<maxmsg;++i){
    qval_t item{Message{string("Hello")+boost::lexical_cast<string>(i),i}};
    BOOST_LOG_TRIVIAL(debug)<<"sending item: "<<item;
    boost::system::error_code ec;
    qsender.sync_enq(item,ec);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
  }
}
// test program
int main(){
  try{
    // remove locks for queue
    qrecv.removeLockVariables(qrecv.qname());

    // listen for on messages on q1 (using asio)
    BOOST_LOG_TRIVIAL(debug)<<"starting async_deq() ...";
    qlistener.timed_async_deq(qlistener_handler,tmo_ms);

    // kick off sender thread
    BOOST_LOG_TRIVIAL(debug)<<"starting thread sender thread ...";
    thread thr(thr_send_sync_messages);

    // kick off io service
    BOOST_LOG_TRIVIAL(debug)<<"starting asio ...";
    ::ios.run();

    // join thread
    BOOST_LOG_TRIVIAL(debug)<<"joining with sender thread ...";
    thr.join();
  }
  catch(exception const&e){
    BOOST_LOG_TRIVIAL(error)<<"cought exception: "<<e.what();
  }
}

Using boost debugging I get the following output:

[2014-11-19 16:10:37.511130] [0x000002ab9b05219e] [debug]   starting async_deq() ...
[2014-11-19 16:10:37.511283] [0x000002ab9b05219e] [debug]   starting thread sender thread ...
[2014-11-19 16:10:37.511332] [0x000002ab9b05219e] [debug]   starting asio ...
[2014-11-19 16:10:37.511531] [0x00000000040cdf94] [debug]   sending item: Hello0 0
[2014-11-19 16:10:37.513826] [0x000002ab9b05219e] [debug]   received item in qlistener_handler (via asio), item: Hello0 0, ec: system:0
[2014-11-19 16:10:37.614470] [0x00000000040cdf94] [debug]   sending item: Hello1 1
[2014-11-19 16:10:37.617070] [0x000002ab9b05219e] [debug]   received item in qlistener_handler (via asio), item: Hello1 1, ec: system:0
[2014-11-19 16:10:37.717460] [0x00000000040cdf94] [debug]   sending item: Hello2 2
[2014-11-19 16:10:37.719530] [0x000002ab9b05219e] [debug]   received item in qlistener_handler (via asio), item: Hello2 2, ec: system:0
[2014-11-19 16:10:40.721492] [0x000002ab9b05219e] [debug]   deque() aborted (via asio), ec: Connection timed out
[2014-11-19 16:10:40.721537] [0x000002ab9b05219e] [debug]   joining with sender thread ...

Notice that the queue listener times out when no more messages are arriving causing the boost::asio to exit and the program to terminate.

File descriptor based queues - overview

On UNIX systems it's fairly common that processes communicate via pipes. A process creates a half duplex pipe, forks off a child which inherits the pipe file descriptors. The parent and child then communicate through the pipe.

Since a pipe is a stream of bytes we must introduce message boundaries if we want to send messages over the pipe. A common way of doing this is to first serialise the message into a string followed by base64 encoding the string. Finally a newline is inserted at the end marking the message boundary. If the message is very long, newline characters may be inserted everyso often in the byte stream. Clearly the receiver must do exactly he opposite on the other side of the pipe when reconstructing the message.

The asio::fdenq_queue and asio::fddeq_queue were designed specifically to be used this way - even though there are other ways to use them.

The queues are not thread safe since it makes little sense in having mutiple readers or writers to and from a pipe. Each queue is initialised wth a file descriptor which in the case of asio::fdenq_queue must be writable. The other queue must have a readable file descriptor. Since creating the file decsriptors is not a queuing problem, it is up to the user of the queus to one way or another create them.

The queue_sender<queue_type> supports the asio::fdenq_queue as template parameter whereas the queue_listener<queue_type> supports the asio::fddeq_queue queue as parameter. In other words, the queues works a little differently than the asio::simple_queue and the asio::polldir_queue - in a sense they only represent endpoints of queues.

An example using file descriptor based queues

I'll show a small exampleof how to use the fd type queues here. The code spawns a child process which reads lines from stdin and writes them back to stdout. The process I use is /bin/vat.

The program is similar to the previous one. The exception is that sending of messages is sone asynchronously so ythere is an asynchrounous callback function which is invoked when a message can be sent.

To setup the file descriptors I'll use a separate function especially desgined to spwan a process and at the same time creating twohalf duplex pipes connecting the parent and the child. The class is simple, but again tedious to write. For right assume the function exist (spawnPipeChild). Here is the code showing how the fd can be used:

#include "asio-extensions/asio_queue.hpp"
#include "utils/sysUtils.h"
#include <vector>
#include <functional>
#include <iostream>
#include <string>
#include <sys/wait.h>
#include <boost/log/trivial.hpp>

using namespace std;
using namespace utils;
using namespace std::placeholders;
namespace asio=boost::asio;

// ----- some constants -----
namespace {
size_t msgcount{0};                 // track #of messages sent
constexpr size_t maxmsg{5};        // we'll send this many messages
constexpr size_t tmo_deq_ms{2000};  // timeout for waiting async. for a message
constexpr size_t tmo_enq_ms{1000};  // timeout waiting to be able to sen a message async.
}
// io_service object (we are asio based)
boost::asio::io_service ios;

// serialize an object to be sent (notice: message boundaries are on '\n' characters)
// (normally messages would be serialized and then base64 encoded)
std::function<void(std::ostream&,string const&)>serialiser=[](std::ostream&os,string const&s){
  os<<s;
};
// de-serialize an object we received (notice: message boundaries are on '\n' characters)
// (normally messages would be base64-decoded and then deserialised)
std::function<string(istream&)>deserialiser=[](istream&is){
  string line;
  getline(is,line);
  return line;
};
// queue types used by parent
// (these are queues used by the parent - the child reads/writes directly to stdin/stdout)
using enq_t=asio::fdenq_queue<string,decltype(serialiser)>;
using deq_t=asio::fddeq_queue<string,decltype(deserialiser)>;

// handle a message we received asynchrounosly
void qlistener_handler(boost::system::error_code const&ec,string msg,asio::queue_listener<deq_t>*ql){
  if(ec!=0)BOOST_LOG_TRIVIAL(info)<<"deque() aborted (via asio), cause: "<<ec.message();
  else{
    // kick off waiting for another message asynchrounosly
    BOOST_LOG_TRIVIAL(info)<<"received msg in qlistener_handler (via asio), msg: \""<<msg<<"\", cause: "<<ec;
    ql->timed_async_deq(std::bind(qlistener_handler,_1,_2,ql),tmo_deq_ms);
  }
}
// handler called asynchrounosly when 'it is possible' to send a message through the pipe
void qsender_handler(boost::system::error_code const&ec,asio::queue_sender<enq_t>*qs){
  // print item if error code is OK
  if(ec)BOOST_LOG_TRIVIAL(info)<<"queue sender interupted (via asio): ignoring callback, cause: "<<ec;
  else{
    // check if we are done
    if(msgcount==maxmsg)return;

    // kick off waiting to be able to send another message asynchrounosly
    string msg{boost::lexical_cast<string>(msgcount++)};
    qs->timed_async_enq(msg,std::bind(qsender_handler,_1,qs),tmo_enq_ms);
  }
}
// main for test program
int main(){
  try{
    // spawn '/bin/cat' as child process
    int fdread,fdwrite;
    int cpid=spawnPipeChild("/bin/cat",vector<string>{"cat"},fdread,fdwrite,true,,".");

    // enclose queues in a scope so that fds are closed automatically when leaving scope
    // (if queues are not destroyed, the child process will hang and wait for more input)
    // (we could of course just close the fds ourselves ... by we not be lazy ...)
    {
      // error code from asio calls
      boost::system::error_code ec;

      // create queues (sepcify to close fds on desctruction)
      enq_t qsend{fdwrite,serialiser,true};
      deq_t qrecv{fdread,deserialiser,true};

      // setup queue sender/listener
      asio::queue_sender<enq_t>qsender(::ios,&qsend);
      asio::queue_listener<deq_t>qlistener(::ios,&qrecv);

      // kick off sending messages asynchronously
      BOOST_LOG_TRIVIAL(info)<<"start sending messages asynchrounously ...";
      string msg{boost::lexical_cast<string>(msgcount++)};
      qsender.timed_async_enq(msg,std::bind(qsender_handler,_1,&qsender),tmo_enq_ms);

      // kick off listening for messages asynchronously
      BOOST_LOG_TRIVIAL(info)<<"starting waiting for asio message ...";
      qlistener.timed_async_deq(std::bind(qlistener_handler,_1,_2,&qlistener),tmo_deq_ms);

      // run asio
      BOOST_LOG_TRIVIAL(info)<<"starting asio ...";
      ::ios.run();
    }
    // wait for child
    BOOST_LOG_TRIVIAL(info)<<"waiting for child (pid: "<<cpid<<") ..."<<endl;
    int waitstat;
    while(waitpid(cpid,&waitstat,0)!=cpid);
  }
  catch(std::exception const&e){
    cerr<<"main: cought exception: "<<e.what()<<endl;
  }
}

Conclusions

Lots of non-transactional code uses boost::asio for messaging. However, lots of the code do not deal with messages separatly from application code. Instead messages are constructed inside application code directly from varioius sources of bytes - typically asio services providing streams bytes or sequences of bytes.

I believe that at the application level we should think about messages, not streams of bytes. Using asio queues allows us to get the abstraction right and not worry about bytes popping in and out of application logic.

By including time-based functionaities directly in the asio queues, the code becomes simpler since there is no need for separate timers together with corresponding variables that must be managed.

With the support of waiting until it is possible to enq or deq, throttling of messages is simple to manage. Queues blocking a pathway can be designed so that message flows are managed properly upstreams in a chain of components connected by queues.

I've used the code myself in architectures where communication is based on asynchronous messaging. Ignoring the horrendous C++ compilation errors from template based code, I have found that using asio together with asio queues can make life a lot simpler than dealing directly with sequences of bytes which must be converted to and from messages. As usual, it's not if you have to deal with bytes and messages, it is where and when in the code you do it. Doing it in the correct place at the correct time, i.e., separating infrastructure coincerns from functional concerns is the right way to go. The alternative is not.

When time allows I'll publish a few more asio queues and also put them up on github. In case you are interested in getting the code, please feel free to email me at hansewetz@hotmail.com

Below I've included the source code to the asio implementation for reference. At some point I will put it under github

Code

Since I haven't published the code on github I'll supply it for reference directly on here the blog - it's not so nice, but may be usefull.

queue_listener - code

Here is a straight listing of the queue_listener:

#ifndef __QUEUE_LISTENER_H__
#define __QUEUE_LISTENER_H__
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>
namespace boost{
namespace asio{

// forward decl
class queue_listener_impl;
template<typename Impl=queue_listener_impl>class basic_queue_listener_service;

// --- IO Object (used by client) -----------------------------
template<typename Service,typename Queue>
class basic_queue_listener:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_listener(boost::asio::io_service&io_service,Queue*q):
      boost::asio::basic_io_object<Service>(io_service),q_(q){
  }
  // async deq operation
  template <typename Handler>
  void async_deq(Handler handler) {
    this->service.async_deq(this->implementation,q_,handler);
  }
  // async deq operation - timed
  template <typename Handler>
  void timed_async_deq(Handler handler,std::size_t ms) {
    this->service.timed_async_deq(this->implementation,q_,handler,ms);
  }
  // wait until we can deq a message from queue in async mode
  template <typename Handler>
  void async_wait_deq(Handler handler) {
    this->service.async_wait_deq(this->implementation,q_,handler);
  }
  // wait until we can deq a message from queue in async mode - timed
  template <typename Handler>
  void timed_async_wait_deq(Handler handler,std::size_t ms) {
    this->service.timed_async_wait_deq(this->implementation,q_,handler,ms);
  }
  // sync deq operation (blocking)
  std::pair<bool,typename Queue::value_type>sync_deq(boost::system::error_code&ec){
    return this->service.sync_deq(this->implementation,q_,ec);
  }
private:
  Queue*q_;
};
// typedefs for using standard queue listeners
template<typename Queue>using queue_listener=basic_queue_listener<basic_queue_listener_service<>,Queue>;

// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_listener_service:public boost::asio::io_service::service{
public:
  // required to have id of service
 static boost::asio::io_service::id id;

  // ctor
  explicit basic_queue_listener_service(boost::asio::io_service&io_service):
      boost::asio::io_service::service(io_service){
  }
  // dtor
  ~basic_queue_listener_service(){
  }
  // get a typedef  for implementation
  using implementation_type=std::shared_ptr<Impl>;

  // mandatory (construct an implementation object)
  void construct(implementation_type&impl){
      impl.reset(new Impl(this->get_io_service()));
  }
  // mandatory (destroy an implementation object)
  void destroy(implementation_type&impl){
    impl.reset();
  }
  // async sync deq operation
  template <typename Handler,typename Queue>
  void async_deq(implementation_type&impl,Queue*q,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_deq(impl,q,handler);
  }
  // async sync deq operation - timed
  template <typename Handler,typename Queue>
  void timed_async_deq(implementation_type&impl,Queue*q,Handler handler,std::size_t ms){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->timed_async_deq(impl,q,handler,ms);
  }
  // async sync wait operation
  template <typename Handler,typename Queue>
  void async_wait_deq(implementation_type&impl,Queue*q,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_wait_deq(impl,q,handler);
  }
  // async sync wait operation - timed
  template <typename Handler,typename Queue>
  void timed_async_wait_deq(implementation_type&impl,Queue*q,Handler handler,std::size_t ms){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->timed_async_wait_deq(impl,q,handler,ms);
  }
  // sync deq operation (blocking)
  template <typename Queue>
  std::pair<bool,typename Queue::value_type>sync_deq(implementation_type&impl,Queue*q,boost::system::error_code&ec){
    return impl->sync_deq(q,ec);
  }
private:
  // shutdown service (required)
  void shutdown_service(){
  }
};

// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_listener_service<Impl>::id;

// --- implementation -----------------------------
class queue_listener_impl{
public:
  // ctor (set up work queue for io_service so we don't bail out when executing run())
  queue_listener_impl(boost::asio::io_service&post_io_service):
      impl_work_(new boost::asio::io_service::work(impl_io_service_)),
      impl_thread_([&](){impl_io_service_.run();}),
      post_io_service_(post_io_service){
  }
  // dtor (clear work queue, stop io service and join thread)
  ~queue_listener_impl(){
    impl_work_.reset(nullptr);
    impl_io_service_.stop();
    if(impl_thread_.joinable())impl_thread_.join();
  }
public:
  // deque message (post request to thread)
  template<typename Handler,typename Queue>
  void async_deq(std::shared_ptr<queue_listener_impl>impl,Queue*q,Handler handler){
    impl_io_service_.post(deq_operation<Handler,Queue>(impl,post_io_service_,q,handler));
  }
  // deque message (post request to thread) - timed
  template<typename Handler,typename Queue>
  void timed_async_deq(std::shared_ptr<queue_listener_impl>impl,Queue*q,Handler handler,std::size_t ms){
    impl_io_service_.post(deq_operation<Handler,Queue>(impl,post_io_service_,q,handler,ms));
  }
  // wait to deq message (post request to thread)
  template<typename Handler,typename Queue>
  void async_wait_deq(std::shared_ptr<queue_listener_impl>impl,Queue*q,Handler handler){
    impl_io_service_.post(wait_deq_operation<Handler,Queue>(impl,post_io_service_,q,handler));
  }
  // wait to deq message (post request to thread) - timed
  template<typename Handler,typename Queue>
  void timed_async_wait_deq(std::shared_ptr<queue_listener_impl>impl,Queue*q,Handler handler,std::size_t ms){
    impl_io_service_.post(wait_deq_operation<Handler,Queue>(impl,post_io_service_,q,handler,ms));
  }
  // dequeue message (blocking deq)
  template<typename Queue>
  std::pair<bool,typename Queue::value_type>sync_deq(Queue*q,boost::system::error_code&ec){
    return q->deq(ec);
  }
private:
  // function object calling blocking deq() on queue
  template <typename Handler,typename Queue>
  class deq_operation{
  public:
    // ctor
    deq_operation(std::shared_ptr<queue_listener_impl>impl,boost::asio::io_service&io_service,Queue*q,Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),handler_(handler),is_timed_(false),ms_(0) {
    }
    // ctor - timed
    deq_operation(std::shared_ptr<queue_listener_impl>impl,boost::asio::io_service&io_service,Queue*q,Handler handler,std::size_t ms):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),handler_(handler),is_timed_(true),ms_(ms){
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_listener_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do blocking call on queue, otherwise post aborted message
      boost::system::error_code ec;
      std::pair<bool,typename Queue::value_type>ret;
      if(impl){
        if(is_timed_)ret=q_->timed_deq(ms_,ec);
        else ret=q_->deq(ec);
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec,ret.second));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec,typename Queue::value_type()));
      }
    }
  private:
    std::weak_ptr<queue_listener_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    Queue*q_;
    Handler handler_;
    bool is_timed_;
    std::size_t ms_;
  };
  // function object calling blocking wait() on queue
  template <typename Handler,typename Queue>
  class wait_deq_operation{
  public:
    // ctor
    wait_deq_operation(std::shared_ptr<queue_listener_impl>impl,boost::asio::io_service &io_service,Queue*q,Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),handler_(handler),is_timed_(false),ms_(0){
    }
    // ctor - timed
    wait_deq_operation(std::shared_ptr<queue_listener_impl>impl,boost::asio::io_service &io_service,Queue*q,Handler handler,std::size_t ms):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),handler_(handler),is_timed_(true),ms_(ms){
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_listener_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do (potentially) blocking call on queue, otherwise post aborted message
      boost::system::error_code ec;
      if(impl){
        if(is_timed_)q_->timed_wait_deq(ms_,ec);
        else q_->wait_deq(ec);
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec));
      }
    }
  private:
    std::weak_ptr<queue_listener_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    Queue*q_;
    Handler handler_;
    bool is_timed_;
    std::size_t ms_;
  };
  // private data
  boost::asio::io_service impl_io_service_;
  std::unique_ptr<boost::asio::io_service::work>impl_work_;
  std::thread impl_thread_;
  boost::asio::io_service&post_io_service_;
};
}
}
#endif

queue_sender - code

Here is a straight listing of the queue_sender:

#ifndef __QUEUE_SENDER_H__
#define __QUEUE_SENDER_H__
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>
namespace boost{
namespace asio{

// forward decl
class queue_sender_impl;
template<typename Impl=queue_sender_impl>class basic_queue_sender_service;

// --- IO Object (used by client) -----------------------------
template<typename Service,typename Queue>
class basic_queue_sender:public boost::asio::basic_io_object<Service>{
public:
  // ctor
  explicit basic_queue_sender(boost::asio::io_service&io_service,Queue*q):
      boost::asio::basic_io_object<Service>(io_service),q_(q){
  }
  // async enq operation
  template <typename Handler>
  void async_enq(typename Queue::value_type val,Handler handler) {
    this->service.async_enq(this->implementation,q_,val,handler);
  }
  // async enq operation - timed
  template <typename Handler>
  void timed_async_enq(typename Queue::value_type val,Handler handler,std::size_t ms) {
    this->service.timed_async_enq(this->implementation,q_,val,handler,ms);
  }
  // wait until we can put a message in queue in async mode
  template <typename Handler>
  void async_wait_enq(Handler handler) {
    this->service.async_wait_enq(this->implementation,q_,handler);
  }
  // wait until we can put a message in queue in async mode - timed
  template <typename Handler>
  void timed_async_wait_enq(Handler handler,std::size_t ms) {
    this->service.timed_async_wait_enq(this->implementation,q_,handler,ms);
  }
  // sync enq operation (blocking)
  void sync_enq(typename Queue::value_type val,boost::system::error_code&ec){
    this->service.sync_enq(this->implementation,q_,val,ec);
  }
private:
  Queue*q_;
};
// typedefs for using standard queue listeners
template<typename Queue>using queue_sender=basic_queue_sender<basic_queue_sender_service<>,Queue>;

// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_sender_service:public boost::asio::io_service::service{
public:
  // required to have id of service
 static boost::asio::io_service::id id;

  // ctor
  explicit basic_queue_sender_service(boost::asio::io_service&io_service):
      boost::asio::io_service::service(io_service){
  }
  // dtor
  ~basic_queue_sender_service(){
  }
  // get a typedef  for implementation
  using implementation_type=std::shared_ptr<Impl>;

  // mandatory (construct an implementation object)
  void construct(implementation_type&impl){
      impl.reset(new Impl(this->get_io_service()));
  }
  // mandatory (destroy an implementation object)
  void destroy(implementation_type&impl){
    impl.reset();
  }
  // async sync enq operation
  template <typename Handler,typename Queue>
  void async_enq(implementation_type&impl,Queue*q,typename Queue::value_type val,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_enq(impl,q,val,handler);
  }
  // async sync enq operation - timed
  template <typename Handler,typename Queue>
  void timed_async_enq(implementation_type&impl,Queue*q,typename Queue::value_type val,Handler handler,std::size_t ms){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->timed_async_enq(impl,q,val,handler,ms);
  }
  // async sync wait operation
  template <typename Handler,typename Queue>
  void async_wait_enq(implementation_type&impl,Queue*q,Handler handler){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->async_wait_enq(impl,q,handler);
  }
  // async sync wait operation - timed
  template <typename Handler,typename Queue>
  void timed_async_wait_enq(implementation_type&impl,Queue*q,Handler handler,std::size_t ms){
    // this is a non-blocking operation so we are OK calling impl object in this thread
    impl->timed_async_wait_enq(impl,q,handler,ms);
  }
  // sync enq operation (blocking)
  template <typename Queue>
  void sync_enq(implementation_type&impl,Queue*q,typename Queue::value_type val,boost::system::error_code&ec){
    impl->sync_enq(q,val,ec);
  }
private:
  // shutdown service (required)
  void shutdown_service(){
  }
// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_sender_service<Impl>::id;

// --- implementation -----------------------------
class queue_sender_impl{
public:
  // ctor (set up work queue for io_service so we don't bail out when executing run())
  queue_sender_impl(boost::asio::io_service&post_io_service):
      impl_work_(new boost::asio::io_service::work(impl_io_service_)),
      impl_thread_([&](){impl_io_service_.run();}),
      post_io_service_(post_io_service){
  }
  // dtor (clear work queue, stop io service and join thread)
  ~queue_sender_impl(){
    impl_work_.reset(nullptr);
    impl_io_service_.stop();
    if(impl_thread_.joinable())impl_thread_.join();
  }
public:
  // enque message (post request to thread)
  template<typename Handler,typename Queue>
  void async_enq(std::shared_ptr<queue_sender_impl>impl,Queue*q,typename Queue::value_type val,Handler handler){
    impl_io_service_.post(enq_operation<Handler,Queue>(impl,post_io_service_,q,val,handler));
  }
  // enque message (post request to thread) - timed
  template<typename Handler,typename Queue>
  void timed_async_enq(std::shared_ptr<queue_sender_impl>impl,Queue*q,typename Queue::value_type val,Handler handler,std::size_t ms){
    impl_io_service_.post(enq_operation<Handler,Queue>(impl,post_io_service_,q,val,handler,ms));
  }
  // wait to enq message (post request to thread)
  template<typename Handler,typename Queue>
  void async_wait_enq(std::shared_ptr<queue_sender_impl>impl,Queue*q,Handler handler){
    impl_io_service_.post(wait_enq_operation<Handler,Queue>(impl,post_io_service_,q,handler));
  }
  // wait to enq message (post request to thread) - timed
  template<typename Handler,typename Queue>
  void timed_async_wait_enq(std::shared_ptr<queue_sender_impl>impl,Queue*q,Handler handler,std::size_t ms){
    impl_io_service_.post(wait_enq_operation<Handler,Queue>(impl,post_io_service_,q,handler,ms));
  }
  // enque message (blocking enq)
  template<typename Queue>
  void sync_enq(Queue*q,typename Queue::value_type val,boost::system::error_code&ec){
    q->enq(val,ec);
  }
private:
  // function object calling blocking enq() on queue
  template <typename Handler,typename Queue>
  class enq_operation{
  public:
    // ctor
    enq_operation(std::shared_ptr<queue_sender_impl>impl,boost::asio::io_service &io_service,Queue*q,typename Queue::value_type val,Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),val_(val),handler_(handler),timed_(false),ms_(0) {
    }
    // ctor - timed
    enq_operation(std::shared_ptr<queue_sender_impl>impl,boost::asio::io_service &io_service,Queue*q,typename Queue::value_type val,Handler handler,std::size_t ms):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),val_(val),handler_(handler),timed_(true),ms_(ms) {
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_sender_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do (potentially) blocking call on queue, otherwise post aborted message
      boost::system::error_code ec;
      if(impl){
        bool ret;
        if(timed_)ret=q_->timed_enq(val_,ms_,ec);
        else ret=q_->enq(val_,ec);
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,boost::asio::error::operation_aborted));
      }
    }
  private:
    std::weak_ptr<queue_sender_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    Queue*q_;
    typename Queue::value_type val_;
    Handler handler_;
    bool timed_;
    std::size_t ms_;
  };
  // function object calling blocking wait() on queue
  template <typename Handler,typename Queue>
  class wait_enq_operation{
  public:
    // ctor
    wait_enq_operation(std::shared_ptr<queue_sender_impl>impl,boost::asio::io_service &io_service,Queue*q,Handler handler):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),handler_(handler),timed_(false),ms_(0) {
    }
    // ctor - timed
    wait_enq_operation(std::shared_ptr<queue_sender_impl>impl,boost::asio::io_service &io_service,Queue*q,Handler handler,std::size_t ms):
        wimpl_(impl),io_service_(io_service),work_(io_service),q_(q),handler_(handler),timed_(true),ms_(ms) {
    }
    // function calling implementation object - runs in the thread created in ctor
    void operator()(){
      // make sure implementation object is still valid
      std::shared_ptr<queue_sender_impl>impl{wimpl_.lock()};

      // if valid, go ahead and do (potentially) blocking call on queue, otherwise post aborted message
      boost::system::error_code ec;
      if(impl){
        if(timed_)q_->wait_enq(ec);
        else q_->wait_enq(ec);
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec));
      }else{
        this->io_service_.post(boost::asio::detail::bind_handler(handler_,ec));
      }
    }
  private:
    std::weak_ptr<queue_sender_impl>wimpl_;
    boost::asio::io_service&io_service_;
    boost::asio::io_service::work work_;
    Queue*q_;
    Handler handler_;
    bool timed_;
    std::size_t ms_;
  };
  // private data
  boost::asio::io_service impl_io_service_;
  std::unique_ptr<boost::asio::io_service::work>impl_work_;
  std::thread impl_thread_;
  boost::asio::io_service&post_io_service_;
};
}
}
#endif

fddeq_queue - code

Here is a straight listing of the fddeq_queue:

#ifndef __FDDEQ_QUEUE_H__
#define __FDDEQ_QUEUE_H__
#include <utility>
#include <iostream>
#include <string>
#include <sstream>
#include <unistd.h>

namespace boost{
namespace asio{

// a simple queue based on sending messages separated by '\n'
// (if sending objects first serialise the object, the base64 encode it in the serialiser)
// (the tmo in ms is based on message timeout - if no message starts arriving within timeout, the function times out)
// (ones we have started to read a message, the message will never timeout)
// (the class is meant to be used in singele threaded mode and is not thread safe)
template<typename T,typename DESER>
class fddeq_queue{
public:
  // typedef for value stored in queue
  using value_type=T;

  // default message separaor
  constexpr static char NEWLINE='\n';

  // ctors,assign,dtor
  fddeq_queue(int fdread,DESER deser,bool closeOnExit=false,char sep=NEWLINE):fdread_(fdread),deser_(deser),sep_{sep},closeOnExit_(closeOnExit){}
  fddeq_queue(fddeq_queue const&)=delete;
  fddeq_queue(fddeq_queue&&)=default;
  fddeq_queue&operator=(fddeq_queue const&)=delete;
  fddeq_queue&operator=(fddeq_queue&&)=default;
  ~fddeq_queue(){if(closeOnExit_)detail::queue_support::eclose(fdread_,false);}

  // dequeue a message (return.first == false if deq() was disabled)
  std::pair<bool,T>deq(boost::system::error_code&ec){
    T ret{recvwait(0,ec,true)};
    if(ec!=boost::system::error_code())return std::make_pair(false,ret);
    return make_pair(true,ret);
  }
  // dequeue a message (return.first == false if deq() was disabled) - timeout if waiting too long
  std::pair<bool,T>timed_deq(std::size_t ms,boost::system::error_code&ec){
    T ret{recvwait(ms,ec,true)};
    if(ec!=boost::system::error_code())return std::make_pair(false,ret);
    return make_pair(true,ret);
  }
  // wait until we can retrieve a message from queue
  bool wait_deq(boost::system::error_code&ec){
    recvwait(0,ec,false);
    if(ec.value()!=0)return false;
    return true;
  }
  // wait until we can retrieve a message from queue -  timeout if waiting too long
  bool timed_wait_deq(std::size_t ms,boost::system::error_code&ec){
    recvwait(ms,ec,false);
    if(ec==boost::asio::error::timed_out)return false;
    if(ec.value()!=0)return false;
    return true;
  }
  // get underlying file descriptor
  int getfd()const{
    return fdread_;
  }
private:
private:
  // deserialise an object from an fd stream
  // or wait until there is a message to read - in this case, a default cibstructed object is returned
  T recvwait(std::size_t ms,boost::system::error_code&ec,bool getMsg){
    T ret;                            // return value from this function (default ctor if no error)
    std::stringstream strstrm;        // collect read chars in a stringstream

    // loop until we have a message (or until we timeout)
    while(true){
      // setup to listen on fd descriptor
      fd_set input;
      FD_ZERO(&input);
      FD_SET(fdread_,&input);
      int maxfd=fdread_;

      // setup for timeout (ones we get a message we don't timeout)
      struct timeval tmo;
      tmo.tv_sec=ms/1000;
      tmo.tv_usec=(ms%1000)*1000;

      // block on select - timeout if configured
      assert(maxfd!=-1);
      int n=::select(++maxfd,&input,NULL,NULL,ms>0?&tmo:NULL);

      // check for error
      if(n<0){
        ec=boost::system::error_code(errno,boost::system::get_posix_category());
        return T{};
      }
      // check for tmo
      if(n==0){
        ec=boost::asio::error::timed_out;
        return T{};
      }
      // check if we got some data
      if(FD_ISSET(fdread_,&input)){
        // if we are only checking if we have a message we are done here
        if(!getMsg){
          ec= boost::system::error_code{};
          return T{};
        }
        // read up to '\n' inclusive
        ssize_t count{0};
        while(true){
          // read next character in message
          char c;
          ssize_t stat;
          while((stat=::read(fdread_,&c,1))==EINTR){}
          if(stat!=1){
            // create a boost::system::error_code from errno
            ec=boost::system::error_code(errno,boost::system::get_posix_category());
            return T{};
          }
          // save character just read
          strstrm<<c;

          // if we reached newline, send message including newline)
          if(c==sep_)return deser_(strstrm);
        }
        // check if we read all available characters
        // (if there are no more chars to read, then restart select() statement)
        if(++count==n)break;
      }
      // restet tmo 0 zero ms since we don't timeout ones we start reading a message
      ms=0;
    }
  }
  // state of queue
  int fdread_;                           // file descriptors to read from from
  DESER deser_;                          // de-serialiser
  char sep_;                             // message separator
  bool closeOnExit_;                     // close fd on exit
};
}
}
#endif

fdenq_queue - code

Here is a straight listing of the fdenq_queue:

#ifndef __FDENQ_QUEUE_H__
#define __FDENQ_QUEUE_H__
#include "detail/queue_support.hpp"
#include <string>
#include <utility>
#include <unistd.h>

namespace boost{
namespace asio{
  
// a simple queue based on receiving messages separated by '\n'
// (if recieving objects which are serialised, they should have been serialised and then encoded)
// (the tmo in ms is based on message timeout - if no message starts arriving within timeout, the function times out)
// (ones we have started to send a message, the message will never timeout)
// (the class is meant to be used in singele threaded mode and is not thread safe)
template<typename T,typename SERIAL>
class fdenq_queue{
public:
  // typedef for value stored in queue
  using value_type=T;

  // default message separaor
  constexpr static char NEWLINE='\n';

  // ctors,assign,dtor
  fdenq_queue(int fdwrite,SERIAL serial,bool closeOnExit=false,char sep=NEWLINE):fdwrite_(fdwrite),serial_(serial),sep_(sep),closeOnExit_(closeOnExit){}
  fdenq_queue(fdenq_queue const&)=delete;
  fdenq_queue(fdenq_queue&&)=default;
  fdenq_queue&operator=(fdenq_queue const&)=delete;
  fdenq_queue&operator=(fdenq_queue&&)=default;
  ~fdenq_queue(){if(closeOnExit_)detail::queue_support::eclose(fdwrite_,false);}
  
  // enqueue a message (return.first == false if enq() was disabled)
  bool enq(T t,boost::system::error_code&ec){
    return this->sendwait(&t,0,ec,true);
  }
  // enqueue a message (return.first == false if enq() was disabled) - timeout if waiting too long
  bool timed_enq(T t,std::size_t ms,boost::system::error_code&ec){
    return this->sendwait(&t,ms,ec,true);
  }
  // wait until we can retrieve a message from queue
  bool wait_enq(boost::system::error_code&ec){
    return this->sendwait(nullptr,0,ec,false);
  }
  // wait until we can retrieve a message from queue -  timeout if waiting too long
  bool timed_wait_enq(std::size_t ms,boost::system::error_code&ec){
    return this->sendwait(nullptr,ms,ec,false);
  }
  // get underlying file descriptor
  int getfd()const{
    return fdwrite_;
  }
private:
  // serialise an object from an fd stream or wait until we timeout
  // (returns true we we could serialise object, false otherwise - error code will be non-zero if false)
  bool sendwait(T const*t,std::size_t ms,boost::system::error_code&ec,bool sendMsg){
    std::stringstream strstrm;        // serialised object

    // serialise object and get it as a string
    // (no need if we don't need to send object)
    std::string str;
    if(sendMsg){
      serial_(strstrm,*t);
      strstrm<<sep_;
      str=strstrm.str();
    }
    std::string::iterator sbegin{str.begin()};
    std::string::iterator send{str.end()};

    // loop until we have written message or timed out
    while(true){
      // setup to listen on fd descriptor
      fd_set output;
      FD_ZERO(&output);
      FD_SET(fdwrite_,&output);
      int maxfd=fdwrite_;

      // setup for timeout (ones we start writing a message we won't timeout)
      struct timeval tmo;
      tmo.tv_sec=ms/1000;
      tmo.tv_usec=(ms%1000)*1000;

      // block on select - timeout if configured
      assert(maxfd!=-1);
      int n=::select(++maxfd,NULL,&output,NULL,ms>0?&tmo:NULL);

      // check for error
      if(n<0){
        ec=boost::system::error_code(errno,boost::system::get_posix_category());
        return false;
      }
      // check for tmo
      if(n==0){
        ec=boost::asio::error::timed_out;
        return false;
      }
      // check if we wrote some data
      if(FD_ISSET(fdwrite_,&output)){
        // if we are only checking if we can send a message
        if(!sendMsg){
          ec= boost::system::error_code{};
          return true;
        }
        // write as much as we can
        ssize_t count{0};
        while(sbegin!=send&&count!=n){
          // write next character in message
          char c{*sbegin++};
          ssize_t stat;
          while((stat=::write(fdwrite_,&c,1))==EINTR){}
          if(stat!=1){
            // create a boost::system::error_code from errno
            ec=boost::system::error_code(errno,boost::system::get_posix_category());
            return false;
          }
          ++count;
        }
      }
      // check if we are done
      if(sbegin==send)return true;

      // restet tmo 0 zero ms since we don't timeout ones we start reading a message
      ms=0;
    }
  }
  // state of queue
  int fdwrite_;                          // file descriptors serialize object tpo
  SERIAL serial_;                        // serialise
  char sep_;                             // message separator
  bool closeOnExit_;                     // close fd on exit
};
}
}
#endif

polldir_queue - code

Here is a straight listing of the polldir_queue:

#ifndef __POLLDIR_QUEUE_H__
#define __POLLDIR_QUEUE_H__
#include "detail/queue_support.hpp"
#include <string> 
#include <utility>
#include <list>
#include <boost/thread/thread_time.hpp>
#include <boost/filesystem.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>

namespace boost{
namespace asio{

namespace fs=boost::filesystem;
namespace ipc=boost::interprocess;
namespace pt=boost::posix_time;

// a simple threadsafe/interprocess-safe queue using directory as queue and files as storage media for queue items
// (mutex/condition variable names are derived from the queue name)
// (enq/deq have locks around them so that we cannot read partial messages)
template<typename T,typename DESER,typename SERIAL>
class polldir_queue{
public:
  // typedef for value stored in queue
  // (need this so we can create an item with default ctor)
  using value_type=T;

  // ctors,assign,dtor
  // (if maxsize == 0 checking for max numbert of queue elements is ignored)
  polldir_queue(std::string const&qname,std::size_t maxsize,fs::path const&dir,DESER deser,SERIAL serial,bool removelocks):
      qname_(qname),maxsize_(maxsize),dir_(dir),deser_(deser),serial_(serial),removelocks_(removelocks),
      ipcmtx_(ipc::open_or_create,qname.c_str()),ipcond_(ipc::open_or_create,qname.c_str()){
    // make sure path is a directory
    if(!fs::is_directory(dir_))throw std::logic_error(std::string("polldir_queue::polldir_queue: dir_: ")+dir.string()+" is not a directory");
  }
  polldir_queue(polldir_queue const&)=delete;
  polldir_queue(polldir_queue&&)=default;
  polldir_queue&operator=(polldir_queue const&)=delete;
  polldir_queue&operator=(polldir_queue&&)=default;
  ~polldir_queue(){
    if(removelocks_)removeLockVariables(qname_);
  }
  // put a message into queue
  // (returns true if message was enqueued, false if enqueing was disabled)
  bool enq(T t,boost::system::error_code&ec){
    // wait for state of queue is such that we can enque an element
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    ipcond_.wait(lock,[&](){return !enq_enabled_||!fullNolock();});

    // if enq is disabled we'll return
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    // we know we can now write message
    detail::queue_support::write(t,dir_,serial_);
    ipcond_.notify_all();
    return true;
  }
  // put a message into queue - timeout if waiting too lo
  // (returns true if message was enqueued, false if enqueing was disabled)
  bool timed_enq(T t,std::size_t ms,boost::system::error_code&ec){
    // wait for state of queue is such that we can enque an element
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    boost::system_time const tmo_ms=boost::get_system_time()+boost::posix_time::milliseconds(ms);
    bool tmo=!ipcond_.timed_wait(lock,tmo_ms,[&](){return !enq_enabled_||!fullNolock();});

    if(tmo){
      ec=boost::asio::error::timed_out;
      return false;
    }
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    // we know we can now write message
    detail::queue_support::write(t,dir_,serial_);
    ipcond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // wait until we can put a message in queue
  // (returns false if enqueing was disabled, else true)
  bool wait_enq(boost::system::error_code&ec){
    // wait for the state of queue is such that we can return something
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    ipcond_.wait(lock,[&](){return !enq_enabled_||!fullNolock();});

    // if enq is disabled we'll return
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    // we know queue is not full
    ipcond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // wait until we can put a message in queue - timeout if waiting too long
  bool timed_wait_enq(std::size_t ms,boost::system::error_code&ec){
    // wait for the state of queue is such that we can return something
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    boost::system_time const tmo_ms=boost::get_system_time()+boost::posix_time::milliseconds(ms);
    bool tmo=!ipcond_.timed_wait(lock,tmo_ms,[&](){return !enq_enabled_||!fullNolock();});
    if(tmo){
      ec=boost::asio::error::timed_out;
      return false;
    }
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    ipcond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // dequeue a message (return.first == false if deq() was disabled)
  std::pair<bool,T>deq(boost::system::error_code&ec){
    // wait for the state of queue is such that we can return something
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    ipcond_.wait(lock,[&](){return !deq_enabled_||!emptyNolock();});

    // if deq is disabled or timeout
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return std::make_pair(false,T{});
    }
    // we know the cache is not empty now so no need to check
    fs::path file{cache_.front()};
    cache_.pop_front();
    T ret{detail::queue_support::read<T>(file,deser_)};
    ipcond_.notify_all();
    ec=boost::system::error_code();
    return std::make_pair(true,ret);
  }
  // dequeue a message (return.first == false if deq() was disabled) - timeout if waiting too long
  std::pair<bool,T>timed_deq(std::size_t ms,boost::system::error_code&ec){
    // wait for the state of queue is such that we can return something
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    boost::system_time const tmo_ms=boost::get_system_time()+boost::posix_time::milliseconds(ms);
    bool tmo=!ipcond_.timed_wait(lock,tmo_ms,[&](){return !deq_enabled_||!emptyNolock();});

    // if deq is disabled or queue is empty return or timeout
    if(tmo){
      ec=boost::asio::error::timed_out;
       return std::make_pair(false,T{});
    }
    if(!deq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return std::make_pair(false,T{});
    }
    // we know the cache is not empty now so no need to check
    fs::path file{cache_.front()};
    cache_.pop_front();
    T ret{detail::queue_support::read<T>(file,deser_)};
    ipcond_.notify_all();
    ec=boost::system::error_code();
    return std::make_pair(true,ret);
  }
  // wait until we can retrieve a message from queue
  bool wait_deq(boost::system::error_code&ec){
    // wait for the state of queue is such that we can return something
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    ipcond_.wait(lock,[&](){return !deq_enabled_||!emptyNolock();});

    // check if dequeue was disabled
    if(!deq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    // we know the cache is not empty now so no need to check
    fs::path file{cache_.front()};
    ipcond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // wait until we can retrieve a message from queue -  timeout if waiting too long
  bool timed_wait_deq(std::size_t ms,boost::system::error_code&ec){
    // wait for the state of queue is such that we can return something
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    boost::system_time const tmo_ms=boost::get_system_time()+boost::posix_time::milliseconds(ms);
    bool tmo=!ipcond_.timed_wait(lock,tmo_ms,[&](){return !deq_enabled_||!emptyNolock();});
    if(tmo){
      ec=boost::asio::error::timed_out;
      return false;
    }
    if(!deq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    // we know the cache is not empty now so no need to check
    fs::path file{cache_.front()};
    ipcond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // cancel deq operations (will also release blocking threads)
  void disable_deq(bool disable){
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    deq_enabled_=!disable;
    ipcond_.notify_all();
  }
  // cancel enq operations (will also release blocking threads)
  void disable_enq(bool disable){
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    enq_enabled_=!disable;
    ipcond_.notify_all();
  }
  // set max size of queue
  void set_maxsize(std::size_t maxsize){
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    return maxsize_;
  }
  // check if queue is empty
  bool empty()const{
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    return emptyNolock();
  }
  // check if queue is full
  bool full()const{
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    return fullNolock();
  }
  // get #of items in queue
  std::size_t size()const{
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    return sizeNolock();
  }
  // get max items in queue
  std::size_t maxsize()const{
    ipc::scoped_lock<ipc::named_mutex>lock(ipcmtx_);
    return maxsize_;
  }
  // get name of queue
  std::string qname()const{
    return qname_;
  }
  // remove lock variables for queue
  static void removeLockVariables(std::string const&qname){
    detail::queue_support::removeLockVariables(qname);
  }
private:
  // check if queue is full
  // (lock must be held when calling this function)
  bool fullNolock()const{
    // if maxsize_ == 0 we ignore checking the size of the queue
    if(maxsize_==0)return false;
    return sizeNolock()>=maxsize_;
  }
  // check if queue is empty 
  // (lock must be held when calling this function)
  bool emptyNolock()const{
    // cleanup cache and check if we have queue elements
    cleanCacheNolock();
    if(cache_.size()>0)return false;

    // cache is empty, fill it up and check if we are still empty
    fillCacheNolock(true);
    return cache_.size()==0;
  }
  // fill cache
  // (must be called when having the lock)
  void fillCacheNolock(bool forceRefill)const{
    // trash cache and re-read if 'forceRefill' is set or if cache is empty
    if(forceRefill||cache_.empty()){
      std::list<fs::path>sortedFiles{detail::queue_support::getTsOrderedFiles(dir_)};
      cache_.swap(sortedFiles);
    }
  }
  // remove any file in cache which do not exist in dir_
  // (must be called when having the lock)
  void cleanCacheNolock()const{
    std::list<fs::path>tmpCache;
    for(auto const&p:cache_)
      if(fs::exists(p))tmpCache.push_back(p);
    swap(tmpCache,cache_);
  }
  // get size of queue
  // (lock must be held when calling this function)
  size_t sizeNolock()const{
    // must forcefill cache to know the queue size
    fillCacheNolock(true);
    return cache_.size();
  }
  // get oldest file + manage cache_ if needed
  // (lock must be held when calling this function)
  std::pair<bool,fs::path>nextFileNolock()const{
    // clean cache and only fill cache if it's empty
    cleanCacheNolock();
    fillCacheNolock(false);
    if(cache_.empty())return std::make_pair(false,fs::path());
    std::pair<bool,fs::path>ret{std::make_pair(true,cache_.front())};
    cache_.pop_front();
    return ret;
  }
  // user specified characteristics of queue
  std::string qname_;
  std::size_t maxsize_;
  fs::path dir_;
  bool removelocks_;

  // serialization/deserialization functions
  DESER deser_;
  SERIAL serial_;

  // state of queue
  bool deq_enabled_=true;
  bool enq_enabled_=true;

  // mutex/condition variables
  mutable boost::interprocess::named_mutex ipcmtx_;
  mutable boost::interprocess::named_condition ipcond_;

  // cache of message
  mutable std::list<fs::path>cache_;
};
}
}
#endif

The detail/queue_support.hpp file

Here is a straight listing of the "detail/queue_support.hpp file:

#ifndef __QUEUE_SUPPPORT_H__
#define __QUEUE_SUPPPORT_H__

#include <algorithm>
#include <string>
#include <map>
#include <list>
#include <iostream>
#include <utility>
#include <fstream>
#include <memory>
#include <stdexcept>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <boost/lexical_cast.hpp>
#include <boost/filesystem.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

namespace boost{
namespace asio{
namespace detail{
namespace queue_support{

namespace fs=boost::filesystem;
namespace ipc=boost::interprocess;
namespace io=boost::iostreams;

namespace{
// get all filenames in time sorted order in a dircetory
std::list<fs::path>getTsOrderedFiles(fs::path const&dir){
  // need a map with key=time, value=filename
  typedef std::multimap<time_t,fs::path>time_file_map_t;
  time_file_map_t time_file_map;

  // insert all files together with time as key into map
  fs::directory_iterator dir_end_iter;
  for(fs::directory_iterator it(dir);it!=dir_end_iter;++it){
    if(!is_regular_file(*it))continue;
    time_t time_stamp(last_write_time(*it));
    time_file_map.insert(time_file_map_t::value_type(time_stamp,*it));
  }
  // create return list and return
  std::list<fs::path>ret;
  for(auto const&f:time_file_map)ret.push_back(f.second);
  return ret;
}
// remove lock variables for queue
// (name of lock variables are computed from the path to the queue directory)
void removeLockVariables(std::string const&name){
  ipc::named_mutex::remove(name.c_str());
  ipc::named_condition::remove(name.c_str());
}
// helper function for serialising an object
// (lock must be held when calling this function)
template<typename T,typename SERIAL>
void write(T const&t,fs::path const&dir,SERIAL serial){
  // create a unique filename, open file for writing and serialise object to file (user defined function)
  // (serialization function is a user supplied function - see ctor)
  std::string const id{boost::lexical_cast<std::string>(boost::uuids::random_generator()())};
  fs::path fullpath{dir/id};
  std::ofstream os{fullpath.string(),std::ofstream::binary};
  if(!os)throw std::runtime_error(std::string("asio::detail::dirqueue_support::::write: could not open file: ")+fullpath.string());
  serial(os,t);
  os.close();
}
// helper function for deserialising an object
// (lock must be held when calling this function)
template<typename T,typename DESER>
T read(fs::path const&fullpath,DESER deser){
  // open input stream, deserialize stream into an object and remove file
  // (deserialization function is a user supplied function - see ctor)
  std::ifstream is{fullpath.string(),std::ifstream::binary};
  if(!is)throw std::runtime_error(std::string("asio::detail::dirqueue_support::read: could not open file: ")+fullpath.string());
  T ret{deser(is)};
  is.close();
  std::remove(fullpath.string().c_str());
  return ret;
}
// close a file descriptor
int eclose(int fd,bool throwExcept){
  while(close(fd)<0&&errno==EINTR);
  if(errno&&throwExcept){
    std::string err{strerror(errno)};
    char buf[1024];
    ::strerror_r(errno,buf,1024);
    throw std::runtime_error(std::string("eclose: failed closing fd:" )+buf);
  }
  return errno;
}
}
}
}
}
}
#endif

The simple_queue file

The simple_queue which I showed in a previous blog entry has been changed to support the new features of the sender and listener. The simple_queue is an in memory queue which moves objects by value and does not require serialisation of queue items.

Here is a straight listing of the simple_queue file:

#ifndef __SIMPLE_QUEUE_H__
#define __SIMPLE_QUEUE_H__
#include <utility>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
namespace boost{
namespace asio{

// a simple thread safe queue used as default queue in boost::asio::queue_listener
template<typename T,typename Container=std::queue<T>>
class simple_queue{
public:
  // typedef for value stored in queue
  // (need this so we can create an item with default ctor)
  using value_type=T;

  // ctors,assign,dtor
  simple_queue(std::size_t maxsize):maxsize_(maxsize){}
  simple_queue(simple_queue const&)=delete;
  simple_queue(simple_queue&&)=default;
  simple_queue&operator=(simple_queue const&)=delete;
  simple_queue&operator=(simple_queue&&)=default;
  ~simple_queue()=default;

  // put a message into queue
  bool enq(T t,boost::system::error_code&ec){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !enq_enabled_||q_.size()<maxsize_;});
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    q_.push(t);
    cond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // put a message into queue - timeout if waiting too long
  bool timed_enq(T t,std::size_t ms,boost::system::error_code&ec){
    std::unique_lock<std::mutex>lock(mtx_);
    bool tmo=!cond_.wait_for(lock,std::chrono::milliseconds(ms),[&](){return !enq_enabled_||q_.size()<maxsize_;});
    if(tmo){
      ec=boost::asio::error::timed_out;
      return false;
    }
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    q_.push(t);
    cond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // wait until we can put a message in queue
  bool wait_enq(boost::system::error_code&ec){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !enq_enabled_||q_.size()<maxsize_;});
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    cond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // wait until we can put a message in queue - timeout if waiting too long
  bool timed_wait_enq(std::size_t ms,boost::system::error_code&ec){
    std::unique_lock<std::mutex>lock(mtx_);
    bool tmo=!cond_.wait_for(lock,std::chrono::milliseconds(ms),[&](){return !enq_enabled_||q_.size()<maxsize_;});
    if(tmo){
      ec=boost::asio::error::timed_out;
      return false;
    }
    if(!enq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    cond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // dequeue a message (return.first == false if deq() was disabled)
  std::pair<bool,T>deq(boost::system::error_code&ec){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !deq_enabled_||!q_.empty();});

    // if deq is disabled or timeout
    if(!deq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return std::make_pair(false,T{});
    }
    // check if we have a message
    std::pair<bool,T>ret{std::make_pair(true,q_.front())};
    q_.pop();
    cond_.notify_all();
    ec=boost::system::error_code();
    return ret;
  }
  // dequeue a message (return.first == false if deq() was disabled) - timeout if waiting too long
  std::pair<bool,T>timed_deq(std::size_t ms,boost::system::error_code&ec){
    std::unique_lock<std::mutex>lock(mtx_);
    bool tmo=!cond_.wait_for(lock,std::chrono::milliseconds(ms),[&](){return !deq_enabled_||!q_.empty();});

    // if deq is disabled or queue is empty return or timeout
    if(tmo){
      ec=boost::asio::error::timed_out;
       return std::make_pair(false,T{});
    }
    if(!deq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return std::make_pair(false,T{});
    }
    // check if we have a message
    std::pair<bool,T>ret{std::make_pair(true,q_.front())};
    q_.pop();
    cond_.notify_all();
    ec=boost::system::error_code();
    return ret;
  }
  // wait until we can retrieve a message from queue
  bool wait_deq(boost::system::error_code&ec){
    std::unique_lock<std::mutex>lock(mtx_);
    cond_.wait(lock,[&](){return !deq_enabled_||q_.size()>0;});
    if(!deq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    cond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // wait until we can retrieve a message from queue -  timeout if waiting too long
  bool timed_wait_deq(std::size_t ms,boost::system::error_code&ec){
    std::unique_lock<std::mutex>lock(mtx_);
    bool tmo=!cond_.wait_for(lock,std::chrono::milliseconds(ms),[&](){return !deq_enabled_||q_.size()>0;});
    if(tmo){
      ec=boost::asio::error::timed_out;
      return false;
    }
    if(!deq_enabled_){
      ec=boost::asio::error::operation_aborted;
      return false;
    }
    cond_.notify_all();
    ec=boost::system::error_code();
    return true;
  }
  // cancel deq operations (will also release blocking threads)
  void disable_deq(bool disable){
    std::unique_lock<std::mutex>lock(mtx_);
    deq_enabled_=!disable;
    cond_.notify_all();
  }
  // cancel enq operations (will also release blocking threads)
  void disable_enq(bool disable){
    std::unique_lock<std::mutex>lock(mtx_);
    enq_enabled_=!disable;
    cond_.notify_all();
  }
  // set max size of queue
  void set_maxsize(std::size_t maxsize){
    std::unique_lock<std::mutex>lock(mtx_);
    maxsize_=maxsize;
    cond_.notify_all();
  }
  // check if queue is empty
  bool empty()const{
    std::unique_lock<std::mutex>lock(mtx_);
    return q_.empty();
  }
  // check if queue is full
  bool full()const{
    std::unique_lock<std::mutex>lock(mtx_);
    return q_.size()>=maxsize_;
  }
  // get #of items in queue
  std::size_t size()const{
    std::unique_lock<std::mutex>lock;
    return q_.size();
  }
  // get max items in queue
  std::size_t maxsize()const{
    std::unique_lock<std::mutex>lock(mtx_);
    return maxsize_;
  }
private:
  std::size_t maxsize_;
  mutable std::mutex mtx_;
  mutable std::condition_variable cond_;
  bool deq_enabled_=true;
  bool enq_enabled_=true;
  Container q_;
};
}
}
#endif