Pipes are a very simple but powerful abstraction which can be used to implement stream-based IO, in a very similar fashion to iteratees and friends, or conduits. In this post, I introduce **guarded pipes**: a slight generalization of pipes which makes it possible to implement a larger class of combinators.

```
{-# LANGUAGE NoMonomorphismRestriction #-}
module Blog.Pipes.Guarded where
import Control.Category
import Control.Monad.Free
import Control.Monad.Identity
import Data.Maybe
import Data.Void
import Prelude hiding (id, (.), until, filter)
```

The idea behind pipes is straightfoward: fix a base monad `m`

, then construct the free monad over a specific `PipeF`

functor:

```
data PipeF a b m x = M (m x)
| Yield b x
| Await (Maybe a -> x)
instance Monad m => Functor (PipeF a b m) where
fmap f (M m) = M $ liftM f m
fmap f (Yield x c) = Yield x (f c)
fmap f (Await k) = Await (f . k)
type Pipe a b m r = Free (PipeF a b m) r
```

Generally speaking, a free monad can be thought of as an embedded language in CPS style: every summand of the base functor (`PipeF`

in this case), is a primitive operation, while the `x`

parameter represents the continuation at each step.

In the case of pipes, `M`

corresponds to an effect in the base monad, `Yield`

produces an output value, and `Await`

blocks until it receives an input value, then passes it to its continuation. You can see that the `Await`

continuation takes a `Maybe a`

type: this is the only thing that distinguishes guarded pipes from regular pipes (as implemented in the pipes package on Hackage). The idea is that `Await`

will receive `Nothing`

whenever the pipe runs out of input values. That will give it a chance to do some cleanup or yield extra outputs. Any additional `Await`

after that point will terminate the pipe immediately.

We can write a simplistic list-based (strict) interpreter formalizing the semantics I just described:

The boolean parameter is going to be set to `True`

as soon as we execute an `Await`

with an empty input list.

A `Pure`

value means that the pipe has terminated spontaneously, so we return the accumulated output list:

Execute inner monadic effects:

Save yielded values into the accumulator:

If we still have values in the input list, feed one to the continuation of an `Await`

statement.

If we run out of inputs, pass `Nothing`

to the `Await`

continuation…

… but only the first time. If the pipe awaits again, terminate it.

To simplify the implementation of actual pipes, we define the following basic combinators:

```
tryAwait :: Monad m => Pipe a b m (Maybe a)
tryAwait = wrap $ Await return
yield :: Monad m => b -> Pipe a b m ()
yield x = wrap $ Yield x (return ())
lift :: Monad m => m r -> Pipe a b m r
lift = wrap . M . liftM return
```

and a couple of secondary combinators, very useful in practice. First, a pipe that consumes all input and never produces output:

then a simplified `await`

primitive, that dies as soon as we stop feeding values to it.

now we can write a very simple pipe that sums consecutive pairs of numbers:

```
sumPairs :: (Monad m, Num a) => Pipe a a m ()
sumPairs = forever $ do
x <- await
y <- await
yield $ x + y
```

we get:

## Composing pipes

The usefulness of pipes, however, is not limited to being able to express list transformations as monadic computations using the `await`

and `yield`

primitives. In fact, it turns out that two pipes can be composed sequentially to create a new pipe.

```
infixl 9 >+>
(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
(>+>) = go False False
where
```

When implementing `evalPipe`

, we needed a boolean parameter to signal upstream input exhaustion. This time, we need two boolean parameters, one for the input of the upstream pipe, and one for its output, i.e. the input of the downstream pipe. First, if downstream does anything other than waiting, we just let the composite pipe execute the same action:

```
go _ _ p1 (Pure r) = return r
go t1 t2 p1 (Free (Yield x c)) = yield x >> go t1 t2 p1 c
go t1 t2 p1 (Free (M m)) = lift m >>= \p2 -> go t1 t2 p1 p2
```

then, if upstream is yielding and downstream is waiting, we can feed the yielded value to the downstream pipe and continue from there:

if downstream is waiting and upstream is running a monadic computation, just let upstream run and keep downstream waiting:

if upstream terminates while downstream is waiting, finalize downstream:

but if downstream awaits again, terminate the whole composite pipe:

now, if both pipes are waiting, we keep the second pipe waiting and we feed whatever input we get to the first pipe. If the input is `Nothing`

, we set the first boolean flag, so that next time the first pipe awaits, we can finalize the downstream pipe.

```
go False t2 (Free (Await k)) p2@(Free (Await _)) =
tryAwait >>= \x -> go (isNothing x) t2 (k x) p2
go True False p1@(Free (Await _)) (Free (Await k)) =
go True True p1 (k Nothing)
go True True p1@(Free (Await _)) p2@(Free (Await _)) =
tryAwait >>= \_ -> {- unreachable -} go True True p1 p2
```

This composition can be shown to be associative (in a rather strong sense), with identity given by:

So we can define a `Category`

instance:

```
newtype PipeC m r a b = PipeC { unPipeC :: Pipe a b m r }
instance Monad m => Category (PipeC m r) where
id = PipeC idP
(PipeC p2) . (PipeC p1) = PipeC $ p1 >+> p2
```

## Running pipes

A **runnable pipe**, also called `Pipeline`

, is a pipe that doesn’t yield any value and doesn’t wait for any input. We can formalize this in the types as follows:

Disregarding bottom, calling `await`

on such a pipe does not return any useful value, and yielding is impossible. Another way to think of `Pipeline`

is as an arrow (in `PipeC`

) from the terminal object to the initial object of Hask^{1}.

Running a pipeline is straightforward:

```
runPipe :: Monad m => Pipeline m r -> m r
runPipe (Pure r) = return r
runPipe (Free (M m)) = m >>= runPipe
runPipe (Free (Await k)) = runPipe $ k (Just ())
runPipe (Free (Yield x c)) = absurd x
```

where the impossibility of the last case is guaranteed by the types, unless of course the pipe introduced a bottom value at some point.

The three primitive operations `tryAwait`

, `yield`

and `lift`

, together with pipe composition and the `runPipe`

function above, are basically all we need to define most pipes and pipe combinators. For example, the simple pipe interpreter `evalPipe`

can be easily rewritten in terms of these primitives:

```
evalPipe' :: Monad m => Pipe a b m r -> [a] -> m [b]
evalPipe' p xs = runPipe $
(mapM_ yield xs >> return []) >+>
(p >> discard) >+>
collect id
where
collect xs =
tryAwait >>= maybe (return $ xs [])
(\x -> collect (xs . (x:)))
```

Note that we use the `discard`

pipe to turn the original pipe into an infinite one, so that the final return value will be taken from the final pipe.

## Extra combinators

The rich structure on pipes (category and monad) makes it really easy to define new higher-level combinators. For example, here are implementations of some of the combinators in Data.Conduit.List, translated to pipes:

```
sourceList = mapM_ yield
sourceNull = return ()
fold f z = go z
where
go x = tryAwait >>= maybe (return x) (go . f x)
consume = fold (\xs x -> xs . (x:)) id >>= \xs -> return (xs [])
sinkNull = discard
take n = (isolate n >> return []) >+> consume
drop n = replicateM n await >> idP
pipe f = forever $ await >>= yield . f -- called map in conduit
concatMap f = forever $ await >>= mapM_ yield . f
until p = go
where
go = await >>= \x -> if p x then return () else yield x >> go
groupBy (~=) = p >+>
forever (until isNothing >+>
pipe fromJust >+>
(consume >>= yield))
where
-- the pipe p yields Nothing whenever the current item y
-- and the previous one x do not satisfy x ~= y, and behaves
-- like idP otherwise
p = await >>= \x -> yield (Just x) >> go x
go x = do
y <- await
unless (x ~= y) $ yield Nothing
yield $ Just y
go y
isolate n = replicateM_ n $ await >>= yield
filter p = forever $ until (not . p)
```

To work with the equivalent of sinks, it is useful to define a source to sink composition operator:

```
infixr 2 $$
($$) :: Monad m => Pipe () a m r' -> Pipe a Void m r -> m (Maybe r)
p1 $$ p2 = runPipe $ (p1 >> return Nothing) >+> liftM Just p2
```

which ignores the source return type, and just returns the sink return value, or `Nothing`

if the source happens to terminate first. So we have, for example:

```
ex2 :: Maybe [Int]
ex2 = runIdentity $ sourceList [1..10] >+> isolate 4 $$ consume
{- ex2 == Just [1,2,3,4] -}
ex3 :: Maybe [Int]
ex3 = runIdentity $ sourceList [1..10] $$ discard
{- ex3 == Nothing -}
ex4 :: Maybe Int
ex4 = runIdentity $ sourceList [1,1,2,2,2,3,4,4]
>+> groupBy (==)
>+> pipe head
$$ fold (+) 0
{- ex4 == Just 10 -}
ex5 :: Maybe [Int]
ex5 = runIdentity $ sourceList [1..10]
>+> filter (\x -> x `mod` 3 == 0)
$$ consume
{- ex5 == Just [3, 6, 9] -}
```

## Pipes in practice

You can find an implementation of guarded pipes in my fork of pipes. There is also a pipes-extra repository where you can find some pipes to deal with chunked `ByteStream`

s and utilities to convert conduits to pipes.

I hope to be able to merge this into the original pipes package once the guarded pipe concept has proven its worth. Without the `tryAwait`

primitive, combinators like `fold`

and `consume`

cannot be implemented, nor even a simple stateful pipe like one to split a chunked input into lines. So I think there are enough benefits to justify a little extra complexity in the definition of composition.

In reality, Hask doesn’t have an initial object, and the terminal object is actually

`Void`

, because of non-strict semantics.↩