Dataflow in Ruby

Our job is not to write software. Our job is to turn data into information. That we do that through software is an implementation detail. — Dan North, ScanDev 2013.

Two things about data these days: there’s a lot of it, and a lot of it is crap. How can we get information out of that?

I like to think of my code in terms of a pipeline for data, transforming and working it and summarizing it into information. In order to handle growing amounts of data, we can’t process it all at once; we have to handle it piece by piece as it comes in. Each piece gets summarized in all the different ways and then discarded, so we never run out of memory.

With lazy Enumerables (known in other languages as Iterables), it’s easy enough to accommodate the massive data, as it deals with one item at a time. However, every time you run a reduce over the output, the entire input is read again. How can we perform multiple summaries over the same flow of data as it comes through?

My goal: process data lazily, with multiple reductions of various subsets, without mutating state. In Ruby, because Ruby is like Play-Doh for programmers.

My code is in a branch of my fp4rd repo (that’s Functional Principles for Ruby Development). This post proceeds with an explanation of the example problem and then how the solution works.

If you can make it to Ruby Midwest this weekend or Windy City Rails in September, come hear about functional principles for Ruby developers. This little project is the extreme endpiece of that talk.

Problem statement

In Chapter 3 of the Pickaxe, there’s an example of a program that parses book-inventory CSV data like this:

Title,ISBN,Price
To Whom It May Concern,48-2342-182u32,98.56
What Is Your Problem,2938-123883-13,13.99

Of course, being data, it doesn’t all look like that. Some of it is completely unparsable:

Title,That Number Under The Barcode,Amount
George of the Jungle,234-34-,99.44

and some of it is parsable but missing information:

Title,ISBN,Price
Your Mother Was a Lizard,,32.99
Your Father Stank of Elderberries,234-2145-ldk-234,

My program will parse these files. It totals the price of all books that are parsable and have a price; it also counts these, and it counts the lines that were read but not totaled.

Crazy-looking solution

Code is here, and here’s the meat of it.

pipe = Pipeline::Pipe.new.
  expand(printing.(“— Reading file…”,&read_all_lines)).
  through(printing.(“1. Converting book”,&convert_row_to_book)).
  through(printing.(“2. Checking price”,&reject_no_price)).
  split(
    invalid: Pipeline::Pipe.new.keeping(->(a){a.invalid?}).count,
    valid: Pipeline::Pipe.new.keeping(printing.(“3a. Checking book”, ->(a){a.book?})).
      split( count: Pipeline::Pipe.new.count,
             total: Pipeline::Pipe.new.
        through(printing.(“3b. Extracting book”, ->(a){a.book})).
        through(printing.(“4. Pricing”,->(a){a.price})).
        answer(Pipeline::Monoid.plus)
      )
  )

result = pipe.flow(ARGV)

totalPrice = result.value(:valid, :total)
validCount = result.value(:valid, :count)
errorCount = result.value(:invalid)

What is it doing?
It’s setting up a pipeline, a pipeline with three outlets.
It pushes some data through.
It follows the three routes to get the answer at each end.

It’s going like this:

Files from ARGV go through one at a time. They get expanded into lines, which go through a transformation into books. Data is sent through all the routes of each split. Some gets filtered out by “keeping“.[1]

Crazy internals

As the pipe is constructed, it builds up a list of functions. Each function is an iteratee.

Let’s talk about iterators

Say you have an Enumerable. It represents stuff in a particular order. There are two ways to move through that stuff and do something with it:

External iteration means you control the flow of when to get the next one. Get an external iterator from a Ruby Enumerable by calling .each with no arguments. Then call .next on that whenever you please to get an element. Or a StopIteration exception, when there’s no more data. [2]

> e = [1,2,3].next
> e.next
 => 1

Internal iteration means you say what to do with each element, but the Enumerator controls the process of flipping through them. This is the usual pattern in Ruby.

 > [1,2,3].each { |i| puts i }

Two advantages of internal iteration: no mutating state in my code; the Enumerable can perform cleanup after completion, like closing the input file.
One advantage of external iteration: if I want to stop early, I can. If my objective was to find one book that costs more than $5, controlling when to stop could save a lot of time.

To get the best of both, Oleg Kiselyov came up with this idea of iteratees. You hand the Enumerable a function, but that function doesn’t just return a transformed value (like map).  Instead, your function returns a message, one of:

  • “That’s great man, keep going, and next time do this”
  • “I’m done, and here’s the final result”

If you decide “keep going,” then included in the message is another function: the next piece of data will get passed in to that one. So the function you give the Enumerable gets executed exactly once, and supplies either a result or another function. Higher-order functions in action!

For added message-passing goodness, your function doesn’t always receive data. After the last item in the Enumerable is processed, it sends in End Of File. When you get that, you’d better return a final result.

My iteratees

That’s what happens in the pipeline: each piece of the pipe is an iteratee, which can receive a message with data or an EOF, and returns another piece (for next time) or the final result. For instance, look at the end piece count:

  class CountingEndPiece
    include PieceCommon
    def initialize(soFar = 0)
      @soFar = soFar
    end 
    def eof 
      SimpleResult.new(@soFar)
    end 
    def receive msg 
      CountingEndPiece.new(@soFar + 1)
    end 
  end 

At EOF, it always gives a result. For a piece of data, it always gives back another one of itself with an incremented count.[3]

That was an end piece. What about the middle pieces?

They’re iteratees too, except they delegate to the next piece. A “through” piece does a transformation and then delegates. A “keeping” function delegates only if the predicate is satisfied, otherwise returning itself again, waiting for the next piece of data.

End Construction

The Pipe uses the builder pattern, and the build is triggered by an end piece: answer, count, or split. Once the end is reached, all the pieces can be hooked to each other, and the inlet returned.

Sorry About the Monoids

I couldn’t resist using monoids for answer. Don’t worry about them: they’re a class that defines both a combining-method and a starting point for the reduce. Adding integers and concatenating strings are different monoids because they have different starting points. The starting points are necessary for the empty data case.

Flow

All the data goes in, and is passed to all the iteratees down the line. When everything has returned a Result (which happens at EOF in the implemented cases), then the pipe is resolved.

result = pipe.flow(ARGV)

Catching the output

The pipeline has multiple paths, so follow them using the symbol-labels. That’ll get you to the answer at the end of each pipe.

totalPrice = result.value(:valid, :total)
validCount = result.value(:valid, :count)
errorCount = result.value(:invalid)

Conclusion

There you have it: Iteratees in Ruby, and some sort of dataflow pipeline. This is one way to think about data as a river running through your program without getting washed away. Each piece of incoming data was retained as information: either it was counted, or it was counted and its price totaled.

This is one way to handle a crapton of craptastic data. In Ruby!

——-
[1] I know, I know, “keeping” is filter and “through” is map and “expand” is flatmap. But you know what? These names don’t make that much sense in English. They only make sense to people already familiar with functional programming vocabulary and idioms, and that’s not my primary audience.

[2] Why does Ruby’s external iterator not have some sort of hasNext method? Or is there one and I can’t recognize it?

[3] If I were writing this for production I’d totally keep a stateful count and return itself after each message. But that’s a mutable-state optimization, and this is an exercise in functional style.

9 thoughts on “Dataflow in Ruby

  1. Boulder Electric Vehicle and a Precision customer, invented the electric service truck, and convinced Robichaud to test-drive the truck that he felt would be a perfect fit for the service industry because of the short routes service technicians drive daily. f4x exercises

  2. I've already started using some of them. There wasnt a major growth but my boobs were a little more plump and full. Disappointed that it did not include a miter guide. I have even made adjustments so I can throw in a little ground flavored coffee while still using the grinder. old school new body system

  3. Hi Jessica, I just actually wanted to leave a real comment here. I came across this post after watching your 2013 talk regarding functional principles in ruby. It was a great talk with great examples. This post has also been very helpful. Sorry for all the spam you're getting :).

Comments are closed.

Discover more from Jessitron

Subscribe now to keep reading and get access to the full archive.

Continue reading