Friday, November 22, 2019

A transactional extension to para

He said that for a sorcerer, the world of everyday life is not real, or out there, as we believe it is. For a sorcerer, reality, or the world as we all know, is only a description

-- Carlos Castaneda, Jurney to Ixtlan, The lessons of Don Juan.Washington Square Press 1972, page viii

Introduction

In a previous blog entry, Efficient processing of line based text, I described the program para. In the blog I described how processing time can be significantly reduced by using para to spread the load of processing lines from an input stream across multiple identical processes each processing one line at a time.

When running para on very large files where processing time is counted in days rather than minutes, it would be beneficial to be able to restart from a well known point should processing crash the middle instead of having to re-run from the beginning.

In this blog entry I describe in a very short overview an extension to para that supports a simple form of transactions that allows recovery from crashes in a safe and predicable way. More detailed documentation can be found in the github README file

Transactions in para

Transactions in para are committed every N lines where N is specified as a command line parameter. When a transaction is committed a transaction log is written to a file on disk. A transaction log is either new or replaces the previous transaction log, In either case, the log is created atomically.

If a transaction log exist and para is started in recovery mode, para will read the transaction log and adjust it's processing based on the information in the log. For example, say the transaction log contains the following information:

  • #lines committed: 10
  • position in output file: 1038

When para recovers it will skip the first 10 lines and seek to position 1038 in the output stream (if the stream corresponds to a file) and then continue normal processing.

Examples

To check if para will attempt to recover from a stored transaction-log the following command can be executed:

$ para -r

para will output something like:

recovery info --> #lines-committed: 1100, outfile-pos: 133664

Say we have input.txt:


1
2
3
4
5

and an executable exe.bash:

while read line; do
  echo $line 
  sleep 1
done

and we execute (-R is recovery and -C 2 enables transactions every 2 lines):

$>para para -v -i input.txt -o output.out -R -C 2  -- 1 ./exe.bash

the output.txt is then:

1
2
3
4
5

Now, if we execute the same command but hit ^C after seeing the message:

info: committing at 2 lines...

The output is now:

1
2

or possibly:

1
2
3

and the file: .para.txnlog contains (using od -x .para.txnlog):

0000000 0002 0000 0000 0000 0004 0000 0000 0000
0000020

We can view the transaction log by executing:

$para -r

para informs that 2 lines were committed and output in recovery mode starts at position 4 in the output file:

recovery info --> #lines-committed: 2, outfile-pos: 4

Now we can restart para:

$>para para -v -i input.txt -o output.out -R -C 2 -- 1 ./exe.bash

para now writes:

info: skipping first: 2 lines in recovery mode, outfilepos: 4 ...
info: positioning to offset: 4 in output stream
info: committing at 4 lines ...
debug: #timers in queue: 1 (expected: 1 HEARTBEAT timer)
info: committing at 5 lines ...
debug: closing files ...
debug: waiting for child processes ...
debug: cleaning up memory ...
debug: ... cleanup done

We see that para skips 2 lines in the input file and positions at file position 4 in the output file before starting to process. The output file is now:

1
2
3
4
5

Thursday, July 18, 2019

Efficient processing of line based text

The less we understand something, the more variables we require to explain it.

-- Ackoff, The Art Of Problem Solving (p. 111)

Introduction

This blog entry contains a few notes about the para program I put on github (https://github.com/hansewetz/para). More detailed documentation can be found in the github README file.

Background to the para program

A typical processing task in computational linguistics applies some processing function to each line in a file. The result of processing a line is written to some output - typically stdout. A constraint on the processing is normally that the generated output must be be in the same order as the lines read from the input.

The idea behind the para program was (as far as I know) invented by one or more computational linguists. The problem para solves is to parallelize processing of lines in a text file while ensuring that the result of processing line N is written to the output as line N.

A small example

Say you want to calculate the md5 sum of each line in a file. First we'll write a small bash script md5.bash:

#!/bin/bash
while read line; do
  echo $line | md5sum | awk '{print $1}'
done

The following command:

$ echo hello | ./md5.bash

will print:

b1946ac92492d2347c6235b4d2611184

Now say you have a very large file containing many millions of lines of text and want to calculate the md5 sum for each line in the file. If the file monolingual.en.gz is a gzipped file containing the text to process, we can execute the command:

$ time zcat monolingual.en.gz | ./md5.bash > out1.md5sum

On one of the machines I use I get the following timing information:

real    21m22.874s
user    10m18.485s
sys     28m48.447s

The problem when calculating the md5 sum for many millions of lines is that it takes time. Using para we can speed up the calculation by running 5 processing units in parallel:

$ time zcat monolingual.en.gz | para -- 5 ./md5.bash > out2.md5sum

Running the example on the same machine as I previously executed md5 on I now get the time:

real    4m34.788s
user    10m42.282s
sys     29m59.132s

We can see that the processing time is approximately 5 times faster. This is because para was executed with 5 sub-processes, each running in parallel.

How does para work?

para internally starts a number of sub-processes. In our example para started 5 sub-processes where each sub-process executed the command specified on the para command line.

Internally para sets up a pipe, to be precise a full duplex socket pair, between itself and each sub-process. Each line read from the input is fed into one of the sub-processes. Ones a line is sent to a sub-process para does some book keeping tracking the order of lines read and lines sent to sub-processes.

When a sub-process has finished processing a line the result is read by para through the socket pipe. para through its book keeping machinery will either write the result to its output, or alternatively buffer it internally when writing the output would violate output ordering. If the result is buffered para, will track when it can write the buffered result to output so that the input and output ordering stays aligned.

Comments on implementation

For portability reasons the code is written in plain C. My preferred choice would have been C++ together with boost since plain C does not have the same type of standard underlying support libraries. Additionally the language is ... let's say ... pretty limited compared to modern C++. However, C++ and boost create a number of installation and portability issues which are not always easy to solve.

Because of the limited standard library support for plain C data structure, some data structures together with algorithms were implemented from scratch. For example, priority queues, queues and buffers were implemented in the para source code tree.

Improvements to be implemented

para writes one line at a time to a sub-process. If the sub-process is not time consuming, reading and writing from/to sub-processes can easily become the bottle neck where para consumes most of the CPU time. A conceptually straight forward optimization in para would implement writing of multiple lines in one shot to sub-processes.

para buffers have fixes size and cannot be dynamically re-sized. This can be annoying since it requires a user to set a maximum length for lines before running para or run with the default setting and hope that lines are not longer than the default maximum line length. In reality this is not such a big problem since para will not crash in an uncontrolled fashion. If a line is longer than the buffer size para will simply terminate with an error message.

Features to be implemented

para should be able to read input by connecting to a network based server which feeds para with lines of text. para should also be able to connect and send output to a network based server. Exactly how to control number of lines read from the network based server must be thought through and specified.

Download and install para

para can be downloaded and installed from https://github.com/hansewetz/para. Building and installing para is a straight forward cmake based process detailed as 5 simple steps on the github page.