Friday, November 8, 2013

Compact C++11 code for Oracle - Part III

Newton was a genius, but not because of the superior computational power of his brain. Newton’s genius was, on the contrary, his ability to simplify, idealize, and streamline the world so that it became, in some measure, tractable to the brains of perfectly ordinary men.

-- G. Weinberg, An Introduction to General Systems Thinking

In this blog entry I'll show a utility which simplifies modifications to data in an Oracle database table. However, before starting I'll quickly show a few changes to the code from the previous entry.

Changes to code from previous blog entries

First, based on a few comments from colleagues I renamed the classes. Specifically I use source as opposed to collection mostly because my previous OcciSelectCollection was not really a collection. I also added a constructor which creates an OCCI environment and a connection in order to simplify quick prototyping. Finally I made the collection (now called occi_source) movable.

Here is a small program using the renamed classes and the new constructor printing data specified by a select statement to standard out:

#include "occi_utils.h"
#include <iostream>
#include <string>
#include <tuple>
using namespace std;

// --- main test program
int main(){
  // auth credentials
  occi_auth const auth{"hans","ewetz","mydb"};

  // sql statement, row and bind types, bind data
  typedef tuple<int,string,string,string>row_t;
  typedef tuple<string>bind_t;
  string sql{"select rownum,dbid,tab,cutoff_dt from MT_DBSYNC_CUTOFF_DT where dbid=:1"};
  bind_t bind{"Policy"};

  // read data specified by select statement
  occi_source<row_t,bind_t>rows{auth,sql,bind};
  for(auto r:rows)cout<<r<<endl;
}

Letting the occi_source manage all OCCI resources is not a realistic choice for most applications. However, when writing code for tests and prototypes, pushing as much OCCI specific code to the occi_source speeds up development significantly.

Modelling database access

The implementation of the occi_input_iterator was based on the boost::iterator_facade. A better alternative would have been to use the boost boost::function_input_iterator. In the next blog I'll revisit the implementation and will most likely change it to use the boost::function_input_iterator.

It's worth noticing that there are other ways to model access to a database. For example, the concept of streams could be used as opposed to the use of source. Yet another way would be to create a thick layer which models a generic database interface.

I'll continue to stick with a thin layer over OCCI which hides the actual database access but lets the user have control over OCCI resources. Specifically I don't want to invent new ways of specifying SQL statements and I definitely don't want my utilities to look inside SQL statements. Anything related to parsing SQL statements I'll gladly pass along to Oracle.

I will continue to stay close to core C++11 features such as tuples and strings instead of building special classes for rows, bind variables, SQL statements etc. For those interested in more generic alternatives, the Database Template Library (DTL) may be of interest.

An iterator for modifying data

Now, let's take a look at how to model modifications to data in an Oracle table. I'll follow along the same lines as before where I used an occi_source for managing OCCI resources (now I'll use an occi_sink) and an iterator for executing SQL statements. However, this time around I'll use the boost function_output_iterator as the base for implementing an output iterator.

The model where I assign bind variables to an iterator worked fine for select statements. It turns out that the model is OK for modification statements also. However, as you'll see later the code executing a modification statement without bind variables looks a little awkward.

OK, let's start with piece of code showing what I would like to be able to do:

#include "occi_tools.h"
#include <string>
#include <algorithm>
#include <tuple>

using namespace std;

// --- test insert
void update(occi_auth const&auth){
  typedef tuple<int,string>bind_t;
  string sql{"insert into names (id,name,isrt_tmstmp) values(:1,:2,sysdate)"};
  occi_sink<bind_t>sink(auth,sql);
  vector<bind_t>b{bind_t{1,"hans"},bind_t{2,"ingvar"}, bind_t{3,"valdemar"}};
  copy(b.begin(),b.end(),sink.begin());
}
// --- main test program
int main(){
 // auth credentials
  occi_auth const auth{"hans","ewetz"",mydb"};
  update(auth);
}

The code snippet inserts 3 rows in the table names by creating a occi_sink, getting an output iterator from the occi_sink and finally copying a vector of tuples corresponding to the bind variables to the iterafunction_input_iterattor. In the sample code I let the sink manage any OCCI resources including the environment and the connection. As will be clear soon I'll define a set of occi_sink constructors which will allow me to manage some of the OCCI resources.

A boost::function_output_iterator requires a function which will be called each time an item is assigned to the iterator. I'll write it as a function which simply delegates back to the iterator:

// forward decl
template<typename Bind>class occi_output_iterator;
  
// function called for each modification
// (function delegates back to iterator to do work)
template<typename Bind>
class occi_unary_output:std::unary_function<Bind const&,void>{
public:
  // ctors
  occi_unary_output(occi_output_iterator<Bind>*it):it_(it){}

  // modification function
  void operator()(Bind const&bind){
    it_->operator()(bind);
  }
private:
  occi_output_iterator<Bind>*it_;
};

The actual iterator is coded as follows:

// wrapper around boost::function_output_iterator
template<typename Bind>
class occi_output_iterator:public boost::function_output_iterator<occi_unary_output<Bind>>{
friend class occi_unary_output<Bind>;
public:
  // typedef for simpler declarations (tuple<int,int,...> with as many elements a Bind)
  using Size=typename uniform_tuple_builder<std::tuple_size<Bind>::value,std::size_t>::type;

  // ctors, assign, dtor
  occi_output_iterator(oracle::occi::Connection*conn,std::string const&sql,std::size_t batchsize=1,Size const&size=Size()):
      boost::function_output_iterator<occi_unary_output<Bind>>(occi_unary_output<Bind>(this)),
      conn_(conn),sql_(sql),batchsize_(batchsize),nwaiting_(0){

    // create statement, set batch modification, create statement and binder
    stmt_=std::shared_ptr<oracle::occi::Statement>{conn_->createStatement(sql_),occi_stmt_deleter(conn_)};
    stmt_->setMaxIterations(batchsize_);

    // set max size of bind variables which have variable length
    if(batchsize==0){
      throw std::runtime_error("invalid batchsize: 0 while constructing occi_output_iterator");
    }else
    if(batchsize>1){
      // set size for variable size bind variables (will throw exception if size==0 for variable size bind variable)
      occi_bind_sizer<Bind>sizer{stmt_};
      apply_with_index_template(sizer,size);
    }
    // create binder object
    binder_=occi_data_binder(stmt_);
  }
  occi_output_iterator(occi_output_iterator const&)=default;
  occi_output_iterator(occi_output_iterator&&)=default;
  occi_output_iterator&operator=(occi_output_iterator&)=default;
  occi_output_iterator&operator=(occi_output_iterator&&)=default;
  ~occi_output_iterator()=default;

  // explicitly execute buffered statements
  void flush(){flushAux();}
private:
  // modification function
  void operator()(Bind const&bind){
    // check if weed to add previous row, bind new row and check if we need to flush (execute)
    ++nwaiting_;
    if(nwaiting_>1)stmt_->addIteration();
    apply_with_index(binder_,bind);
    if(nwaiting_==batchsize_)flushAux();
  }
  // flush remaining statements
  void flushAux(){
    if(nwaiting_>0){
      stmt_->executeUpdate();
      nwaiting_=0;
    }
  }
private:
  oracle::occi::Connection*conn_;
  std::string const&sql_;
  std::shared_ptr<oracle::occi::Statement>stmt_;
  occi_data_binder binder_;
  std::size_t batchsize_;
  std::size_t nwaiting_;
};

The code requires some explanations. OCCI provides a feature that batches requests and sends them to the server in one go. The batchsize_ and nwaiting _ members keeps track of when it's time to flush a batch of statements to the server. A flush method is provided so a user can explicitly flush remaining statements. For example, if the batch size is set to 3 and 4 items have been assigned to the iterator, 1 statement is buffered. Therefore, a user can explicitly call flush forcing the iterator to send any remaining statements to the server.

There is a little more to managing batching of statements. OCCI must know beforehand the maximum size of a statement. To calculate the size OCCI requires the maximum size of any variable length bind variables. For fixed length bind variables OCCI can calculate the size. To pass this information through the iterator to OCCI I use a tuple having the same number of items but each item having the type size_t. A user must then set the maximum length of a variable length bind variable in the tuple. The functionality for informing OCCI about the size of a variable length bind variable is implemented in occi_bind_sizer:

// set maximum size a bind variable can have (used in update/insert/delete statements)
template<typename T>struct occi_bind_sizer_aux;
template<>struct occi_bind_sizer_aux<int>{
  static void setsize(std::size_t ind,std::shared_ptr<oracle::occi::Statement>stmt,size_t size){
    // nothing to do - not a variable size type
  }
};
template<>struct occi_bind_sizer_aux<std::string>{
  static void setsize(std::size_t ind,std::shared_ptr<oracle::occi::Statement>stmt,std::size_t size){
    // don't allow string length to have max size == 0
    if(size==0)throw std::runtime_error("invalid size: 0 for std::string bind variable while setting 'stmt->setMaxParamSize(ind,size)'");
    stmt->setMaxParamSize(ind+1,size);
  }
};
// TODO: add more types to sizer 
// ...
template<typename ... Args>class occi_bind_sizer;
template<typename ... Args>
class occi_bind_sizer<std::tuple<Args ...>>{
public:
  // get bind type
  using Bind=std::tuple<Args ...>;

  occi_bind_sizer(std::shared_ptr<oracle::occi::Statement>stmt):stmt_(stmt){}
  occi_bind_sizer()=default;
  occi_bind_sizer(occi_bind_sizer const&)=default;
  occi_bind_sizer(occi_bind_sizer&&)=default;
  occi_bind_sizer&operator=(occi_bind_sizer&)=default;
  occi_bind_sizer&operator=(occi_bind_sizer&&)=default;
  ~occi_bind_sizer()=default;
  template<std::size_t Ind>
  void apply(size_t size){
    using T=typename std::decay<decltype(std::get<Ind>(Bind()))>::type;
    occi_bind_sizer_aux<T>::setsize(Ind,stmt_,size);
  }
private:
  std::shared_ptr<oracle::occi::Statement>stmt_;
};

That's all there is to the iterator implementation.

An occi_sink

Once the idea of assigning a tuple representing a bind variable to an iterator is clear, the implementation is straight forward and the iterator can be used as is. However, I started to write the code to show that it was possible to write a small C++ library which would be a typesafe alternative to using PERL for DB access. Using the occi_output_iterator directly is simple but not simple enough. I'll make it simple enough by adding a sink which main responsibility is to create iterators and manage OCCI resources.

The implementation of occi_sink is rather long mostly because it's meant to provide a simple to use interface – not because it's complicated. A large part of the code consists of constructors and move support. The occi_sink is movable but not copyable.

Here is the code:

// sink 
template<typename Bind=std::tuple<>>
class occi_sink{
public:
  // typedef for simpler declarations (tuple<int,int,...> with as many elements a Bind)
  using Size=typename uniform_tuple_builder<std::tuple_size<Bind>::value,std::size_t>::type;

  // enum for controlling commit, rollback or do nothing
  enum commit_t:int{Rollback=0,Commit=1,Nop=2};
  // typedefs
  typedef typename occi_output_iterator<Bind>::value_type value_type;
  typedef typename occi_output_iterator<Bind>::pointer pointer;
  typedef typename occi_output_iterator<Bind>::reference reference;
  typedef occi_output_iterator<Bind>iterator;

  // ctor taking authentication (commit default == true since sink manages resources)
  explicit occi_sink(occi_auth const&auth,std::string const&sql,std::size_t batchsize=1,Size const&size=Size(),commit_t commit=Commit):
      conn_(nullptr),connpool_(nullptr),env_(nullptr),auth_(auth),sql(sql),closeConn_(true),
      releaseConn_(false),terminateEnv_(true),batchsize_(batchsize),size_(size),commit_(commit){
    check_batchsize(batchsize_);
    env_=oracle::occi::Environment::createEnvironment(oracle::occi::Environment::DEFAULT);
    conn_=env_->createConnection(std::get<0>(auth_),std::get<1>(auth_),std::get<2>(auth_));
  }
  // ctor taking environment + authentication (commit default == true since sink manages resources)
  explicit occi_sink(oracle::occi::Environment*env,occi_auth const&auth,std::string const&sql,std::size_t batchsize=1,Size const&size=Size(),commit_t commit=Commit):
      conn_(nullptr),connpool_(nullptr),env_(env),auth_(auth),sql(sql),closeConn_(true),
      releaseConn_(false),terminateEnv_(false),batchsize_(batchsize),size_(size),commit_(commit){
    check_batchsize(batchsize_);
    conn_=env_->createConnection(std::get<0>(auth_),std::get<1>(auth_),std::get<2>(auth_));
  }
  // ctor taking an open connection (commit default == false since sink does not manage resources)
  explicit occi_sink(oracle::occi::Connection*conn,std::string const&sql,std::size_t batchsize=1,Size const&size=Size(),commit_t commit=Nop):
      conn_(conn),connpool_(nullptr),env_(nullptr),sql(sql),closeConn_(false),
      releaseConn_(false),terminateEnv_(false),batchsize_(batchsize),size_(size),commit_(commit){
    check_batchsize(batchsize_);
  }
  // ctor taking a stateless connection pool (commit default == false since sink does not manage resources)
  explicit occi_sink(oracle::occi::StatelessConnectionPool*connpool,std::string const&sql,std::size_t batchsize=1,Size const&size=Size(),commit_t commit=Nop):
      conn_(nullptr),connpool_(connpool),env_(nullptr),sql(sql),closeConn_(false),
      releaseConn_(true),terminateEnv_(false),batchsize_(batchsize),size_(size),commit_(commit){
    check_batchsize(batchsize_);
    conn_=connpool_->getConnection();
  }
  // ctors, assign (movable but not copyable)
  occi_sink()=delete;
  occi_sink(occi_sink const&)=delete;
  occi_sink(occi_sink&&s):
      conn_(s.conn_),connpool_(s.connpool_),env_(s.env_),
      closeConn_(s.closeConn_),releaseConn_(s.releaseConn_),terminateEnv_(s.terminateEnv_),
      auth_(s.auth_),sql(s.sql){
    // reset all relevant state
    reset_state(std::forward<occi_sink<Bind>>(s));
  }
  occi_sink&operator=(occi_sink const&)=delete;
  occi_sink&operator=(occi_sink&&s){
    // std::swap with no throw and reset state of parameter
    swap(s);
    reset_state(std::forward<occi_sink<Bind>>(s));
  }
  // dtor - close occi resources if needed
  ~occi_sink(){
    // check if we should commit
    if(commit_==Commit)conn_->commit();else
    if(commit_==Rollback)conn_->rollback();

    // take care of connection
    if(closeConn_)env_->terminateConnection(conn_);else
    if(releaseConn_)connpool_->releaseConnection(conn_);

    // take care of environment
    if(terminateEnv_)oracle::occi::Environment::terminateEnvironment(env_);
  }
  // get begin/end iterators
  iterator begin()const{return iterator(conn_,sql,batchsize_,size_);}

  // swap function
  void swap(occi_sink&s)noexcept(true){
    std::swap(s.conn_,conn_);std::swap(s.connpool_,connpool_);std::swap(s.env_,env_);
    std::swap(s.closeConn_,closeConn_);std::swap(s.releaseConn_,releaseConn_);std::swap(s.terminateEnv_,terminateEnv_);
    std::swap(s.auth_,auth_);std::swap(s.sql,sql);swap(s.batchsize_,batchsize_);swap(s.size_,size_);swap(s.commit_,commit_);
  }
private:
  // check if batchsize is valid
  void check_batchsize(std::size_t batchsize){
    if(batchsize==0)throw std::runtime_error("invalid batchsize: 0 while constructing occi_sink");
  }
  // reset state for a sink (only called with r-value)
  void reset_state(occi_sink&&s){
    s.conn_=nullptr;s.connpool_=nullptr;s.env_=nullptr;
    s.closeConn_=false;s.releaseConn_=false;s.terminateEnv_=false;
    s.auth_=occi_auth{};s.sql="";s.batchsize_=0;s.size_=Size{};s.commit_=false;
  }
  // occi resources
  oracle::occi::Connection*conn_;
  oracle::occi::StatelessConnectionPool*connpool_;
  oracle::occi::Environment*env_;

  // authentication + sql statement
  occi_auth auth_;
  std::string sql;
  std::size_t batchsize_;
  Size size_;

  // controll of what to do with occi resources
  bool closeConn_;
  bool releaseConn_;
  bool terminateEnv_;
  commit_t commit_;
};

A few examples

1 - Deleting without bind variables

Here is an example showing how to execute a delete statement without bind variables :

  string sql{"delete from names"};
  occi_sink<tuple<>>sink(auth,sql);
  *sink.begin()=tuple<>();

I mentioned earlier in this blog item that executing statements without bind variables would look a little awkward. I could probably build in some feature in the occi_sink to execute simple statements like the one here. However, I prefer to see it as a feature that there is a single consistent way of executing modification statements.

2 - Using plain iterator without sink

Here is a (slightly modified) production example using both an occi_source and an occi_output_iterator directly. The function is part of a class holding a connection in a shared_ptr attribute conn_:

// add a package to database
int PackageMetadata::addPackage(string const&name,string const&location,string const&clientid,string const&docformat){
  // get new package odentifier
  using row_t=tuple<int>;
  occi_source<row_t>rows(conn_.get(),"select EXMT_PACKAGE_ID_SEQ.nextval from dual");
  int pkgid{get<0>(*rows.begin())};

  // insert package into db
  using bind_t=tuple<int,string,string,string,string>;
  string sql{"insert into EXMT_PACKAGE (ID,NAME,PKG_LOCATION,CLIENT_ID,DOC_FORMAT,ISRT_TMSTMP,LST_UPD_TMSTMP)VALUES(:1,:2,:3,:4,:5,sysdate,sysdate)"};
  occi_output_iterator<bind_t>it{conn_.get(),sql,1};
  *it=bind_t{pkgid,name,location,clientid,docformat};

  // return package identifier
  return pkgid;
}

3 - Batching statements

Finally here is an example which sets the batch size, inserts some rows and and flushes remaining (buffered) statements:

  typedef tuple<int,string>bind_t;
  typedef tuple<int,int>size_t;
  string sql{"insert into names (id,name,isrt_tmstmp) values(:1,:2,sysdate)"};
  occi_sink<bind_t>sink(auth,sql,2,size_t(0,100));
  vector<bind_t>b{bind_t{1,"hans"},bind_t{2,"ingvar"},bind_t{3,"valdemar"}};
  auto it=sink.begin();
  copy(b.begin(),b.end(),it);
  it.flush();

Notice that when explicitly setting the batch size to a number greater than 1, statements will be buffered and flushed. When buffering statements, OCCI must have information available so it can calculate the maximum size of a single statement. Because of this OCCI must know the maximum size of any variable length bind variables. That information is passed to the occi_sink as a tuple holding size_t types. For non-variable length bind variables the corresponding length is ignored by the occi_output_iterator.

Conclusions

OCCI contains a large number of features. Here I only support a few of them such as batching of statements sent to the Oracle server. It's clear that there is room for supporting more of the OCCI features.

An alternative to adding more features would be to expose the statement type to a user. A user would then be able to retrieve the statement from an occi_output_iterator and call various OCCI methods on the statement. The risk here is of course that it would interfere with the function of the occi_output_iterator. Until there is time to analyse what should be added (and possibly removed) I'll leave the wrappers alone

Even though it took a little while before I had time to punch out the blog entries the code was written in a hurry. I'm sure that there are issues and bugs in the code. I'm also sure that some of the design decisions were not optimal. For example, some constructors take default parameters and I got a feeling that I didn't declare them in the correct order when I last used the code.

Overall I'm pretty happy that by using some of the basic C++11 features such as tuples and variadic parameters it was possible to write a thin, relative non-intrusive, wrapper around OCCI making Oracle access in C++ as simple as Oracle access in PERL.

I'll probably write one more entry about this topic once I've had more time to think about what should and could be done better. A few things I'll modify as soon as time allows is to use boost::function_input_iterator when implementing the occi_input_iterator. I'll also take a look at possibly modify the tuple utilities to use index lists to manage recursions.

I should mention that when showing the code using the occi_sink and the occi_source to a PERL developer the reaction was 'so what's the advantage?. Well … let's start with type safety

No comments:

Post a Comment