Smart Data with Conduits

If you’re a programmer now, there’s one reality you’d best be getting used to. People expect you to know how to deal with big data. The kind of data that will take a while to process. The kind that will crash your program if you try to bring it all into memory at the same time. But you also want to avoid making individual SQL requests to a database to access every single row. That would be awfully slow. So how do you solve this problem? More specifically, how do you solve this problem in Haskell?

This is exactly the type of problem the Data.Conduit library exists for. This article will go through a simple example. We’ll stream some integers using a conduit "source" and add them all together. Luckily, we only ever need to see one number at a time from the source, so we have no need of bringing a ton a data into memory. Let’s first take a look at how we create this source.

Getting Our Data

Suppose we are just trying to find the sum of the numbers from 1 up to 10 million. The naive way to do this would be to have our “source” of numbers be a raw list, and then we would take the sum of this list:

myIntegerSourceBad :: [Integer]
myIntegerSourceBad = [1..10000000]

sumFromSource :: [Integer] -> Integer
sumFromSource lst = sum lst

This method necessitates having the entire list in memory at the same time. Imagine if we were querying a database with tens of millions of entries. That would be problematic. Our first stroke to solve this will be to use Data.Conduit to create a “Source” of integers from this list. We can do this with the sourceList function. Note that Identity is the simple Identity monad. That is, the monad base monad that doesn’t actually do anything:

import Control.Monad.Identity (Identity)
import Data.Conduit (Source)
import Data.Conduit.List (sourceList)

myIntegerSource :: Source Identity Integer
myIntegerSource = sourceList [1..10000000]

So now instead of returning a lump list of Int values, we’re actually getting a stream of values we call a Source. Let’s describe the meaning of some of the conduit types so we’ll have a better idea of what’s going on here.

Conduit Types

The conduit types are all built on ConduitM, which is the fundamental monad for conduits. This type has four different parameters. We’ll write out its definition like so:

type ConduitM i o m r

You should think of each conduit as a data processing funnel. The i type is the input that you can bring into the funnel. The o type stands for output. These are the values you can push out of the funnel. The m type is the underlying monad. We'll see examples with both the Identity monad and the IO monad.

We can think of the last r type as the “result”. But it’s different from the output. This is what the function itself will return as its value once it’s done. We'll see a return type when we write our sink later. But otherwise, we'll generally just use (). As a result, we can rewrite some types using the Conduit type synonym. This synonym is helpful for ignoring the return type. But it is also annoying because it flips the placement of the m and o types. Don’t let this trip you up.

type Conduit i m o = ConduitM i o m ()

Now we can define our source and sink types. A Source is a conduit that takes no input (so i is the unit type) and produces some output. Conversely, a Sink is a conduit that takes some input but produces no output (so the Void type):

type Source m o = Conduit () m o
type Sink i m r = ConduitM i Void m r

So in the above example, our function is a “source”. It creates a series of integer values without taking any inputs. We’ll see how we can match up different conduits that have different matching types.

Primitive Functions

There are three basic functions in the conduits library: yield, await, and leftover. Yield is how we pass a value downstream to another conduit. In other words, it is a “production” function, or a source of values. We can only yield types that fit with the output type of our conduit function.

So then how do we receive values from upstream? The answer is the await function. This operates as a sink and allows a function to remain inert while it waits to receive a value from upstream. Naturally, this value must be of the type of the input of the conduit. Now, the actual resulting type of await is a Maybe value. We could receive a value of Nothing. This indicates that our upstream source has terminated and we won't receive any more values. This is usually the circumstance in which we’ll let our function return.

The final function is the leftover function. This allows you to take a value that you have already pulled from upstream and put it back upstream. This way, we can consume the value again from a different sink, or a different iteration of this sink. One caveat the docs add is that you should not use leftover with values that you have have created yourself. You should ONLY use values you got from upstream.

Writing Our Conduits

So above we’ve already written a source conduit for our toy program. We return a list of integers so we can stream them into our program 1-by-1. Now we’ll write a sink that will take these values and add them together. This will take the form of a recursive function with an accumulator argument. Sinks often function recursively to receive their next input.

So we’ll start off by awaiting some value from the upstream conduit.

myIntegerSink :: Integer -> Sink Integer Identity Integer
myIntegerSink accum = do
  maybeFirstVal <- await
  ...

If that value is Nothing, we’ll know there are no more values coming. We can then proceed to return whatever accumulated value we have.

myIntegerSink :: Integer -> Sink Integer Identity Integer
myIntegerSink accum = do
  maybeFirstVal <- await
  case maybeFirstVal of
    Nothing -> return accum
    ...

If that value contains another Int, we’ll first calculate the new sum by adding them together. Then we’ll make a recursive call to the sink function with the new accumulator argument:

myIntegerSink :: Integer -> Sink Integer Identity Integer
myIntegerSink accum = do
  maybeFirstVal <- await
  case maybeFirstVal of
    Nothing -> return accum
    Just val -> do
      let newSum = val + accum
      myIntegerSink newSum

And that’s it really! Our example is simple so the code ends up being simple as well.

Combining Our Conduits

Now we’ll want to combine our conduits. We do this with the “fuse” operator, written out as =$=. Haskell libraries can be notorious for having strange operators. This library isn’t necessarily an exception. But think of conduits as a tunnel, and this operator looks like it's connecting two parts of a tunnel.

With this operator, the output type of the first conduit needs to match the input type of the second conduit. With how we’ve set up our conduits, this is the case. So now to polish things off, we use runConduitPure with our combined conduit:

fullConduit :: Integer
fullConduit = runConduitPure $ 
  myIntegerSource =$= myIntegerSink 0

It’s generally quite easy using fuse to add more conduits. For instance, suppose we wanted even numbers to count double for our purposes. We could accomplish this in our source or sink, but we could also add another conduit. It will take an Int as input and yield an Int as output. It will want for its input integer, check its value, and then double the value if it is even. Then we will yield the resulting value. Once again, if we haven’t seen the end of the conduit, we need to recursively jump back into the conduit.

myDoublingConduit :: Conduit Integer Identity Integer
myDoublingConduit = do
  maybeVal <- await
  case maybeVal of
    Nothing -> return () 
    Just val -> do
      let newVal = if val `mod` 2 == 0
            then val * 2
            else val
      yield newVal
      myDoublingConduit

Then we can stick this conduit between our other conduits with the fuse operator!

fullConduit :: Integer
fullConduit = runConduitPure $ 
  myIntegerSource =$= myDoublingConduit =$= myIntegerSink 0

Vectorizing

There are also times when you’ll want to batch up certain transactions. A great example of this is when you want to insert all results from your sink into a database. You don’t want to keep sending individual insert queries down the line. You can instead wait and group a bunch of inputs together and send batch inserts.

For our toy example, we’ll gather our ints in groups of 100000 so that we can give log progress updates along the way. This will require changing our conduits to live on top of the IO monad. But once we’ve made this change, we can use the conduitVector function like so:

fullConduitIO :: IO Integer
fullConduitIO = runConduit $ 
  myIntegerSourceIO =$= conduitVector 100000 =$= myIntegerVectorSink 0

myIntegerSourceIO :: Source IO Integer
myIntegerSourceIO = sourceList [1..100000000]

myIntegerVectorSink :: Integer -> Sink (Vector Integer) IO Integer
myIntegerVectorSink accum = do
  maybeFirstVal <- await
  case maybeFirstVal of
    Nothing -> return accum
    Just vals -> do
      let newSum = (Vec.sum vals) + accum
      lift $ print newSum
      myIntegerVectorSink newSum

By vectorizing the conduit, we need to change the sink so that it takes Vector Int as its input type instead of Int. Then we get a vector from await, so we have to sum those values as well.

Summary

The Data.Conduit library allows you to deal with large amounts of data in sustainable ways. You can use it to stream data from a database or some other source. This is more efficient than bringing a large chunk of information into memory all at once. It also allows you to pass information through “tunnels”, called conduits. You can make these perform many complicated operations. You mainly compose conduit functions from the yield, await and leftover functions. You merge conduits together into a larger conduit with the “fuse” operator =$=. You can also use the conduitVector function to batch certain operators.

This was more advanced of a topic. If you’ve never written Haskell before, it’s not as scary as this article makes it out to be! Check out our Getting Started Checklist to take the first steps on your Haskell journey!

If you’ve done a little bit of Haskell before but need some more practice on the fundamentals, you should download our Recursion Workbook. It has some great material on recursion as well as 10 practice problems!

Next week we’ll keep up with some more advanced topics. We’ll look into the Data.Aeson library and how it allows us to serialize Haskell objects into JSON format! So stay tuned to Monday Morning Haskell!

Appendix

I mentioned earlier that operators can be a major pain point in Haskell. Unfortunately, so can documentation. This can be especially true when tracking down the right docs for the conduit concepts. So for reference, here are all the imports I used:

import Conduit (conduitVector, lift)
import Control.Monad.Identity (Identity)
import Data.Conduit (Source, Sink, await, runConduitPure, (=$=), Conduit, yield, runConduit)
import Data.Conduit.List (sourceList)
import Data.Vector hiding (sum)
import qualified Data.Vector as Vec

Note that Data.Conduit and Data.Conduit.List come from the conduit library on hackage. However, the Conduit module actually comes from conduit-combinators. This is very deceptive.

Previous
Previous

Flexible Data with Aeson

Next
Next

Numbers of Every Shape and Size