Pipes

Primitives

StateT
newtype StateT s m a = StateT {
    runStateT :: s -> m (a, s)
}
Free Monad
data Free f a = Free (f (Free f a)) | Pure a

liftF :: Functor f => f a -> Free f a
Void

is the inhabited type and denote a closed output

Proxy

Pipes defines a single type Proxy which is a monad transformer:

 (Proxy p) => p a' a b' b m r

  Upstream | Downstream
     +---------+
     |         |
 a' <==       <== b'
     |  Proxy  |
 a  ==>   m   ==> b
     |         |
     +----|----+
          r
type Effect = Proxy X () () X
runEffect :: (Monad m) => Effect m r -> m r

Effect is a proxy that never yield or wait. The default API exposes a pull-based unidirectional flow.

Producer

A Producer is a monad transformer that extends any base monad with a yield command. yield emits a value, suspending the current Producer until the value is consumed. If nobody consumes the value (which is possible) then yield never returns.

type Producer b m r = Proxy X () () b m r
     +---------+
     |         |
Void <==       <== ()
     |  Proxy  |
 ()  ==>       ==> b
     |         |
     +---------+
yield :: (Monad m) => b -> Producer' b m ()

for :: (Monad m)
    =>       Proxy x' x b' b m a'
    -> (b -> Proxy x' x c' c m b')
    ->       Proxy x' x c' c m a'


-- "into" compose the bodies of `for`
(~>) :: (Monad m)
     => (a -> Producer b m r)
     -> (b -> Producer c m r)
     -> (a -> Producer c m r)
(f ~> g) x = for (f x) g
~> and yield form a Category ("Generator") where yield is the identity.

With for you consume every element of a Producer the exact same way. If this is not suitable, use next or a Consumer.

Think of next as pattern matching on the head of the Producer. This Either returns a Left if the Producer is done or it returns a Right containing the next value, a, along with the remainder of the Producer:

next :: Monad m => Producer a m r -> m (Either r (a, Producer a m r))

Consumer

A consumer represents an "exhaustible" (it may refuse to accept new values) and possibly effectful sink of values. An example of an exhaustible sink is toOutput from pipes-concurrency, which will terminate if the Output it writes to has been sealed.

await blocks waiting for a new value. If nobody provides it (which is possible) then await never returns.

type Consumer a = Proxy () a () X
     +---------+
     |         |
 () <==       <== ()
     |  Proxy  |
 a  ==>       ==> Void
     |         |
     +---------+
await :: Monad m => Consumer' a m a
(>~)

Repeatedly feeds await in the consumer with the action passed as the first parameter. This allows consumer composition

Examples
runEffect $ lift getLine >~ stdoutLn
        +- Feed             +- Consumer to      +- Returns new
        |  action           |  feed             |  Effect
        v                   v                   v
(>~) :: Effect m b       -> Consumer b m c   -> Effect m c
(>~) :: Consumer a m b   -> Consumer b m c   -> Consumer a m c
(>~) :: Producer y m b   -> Pipe     b y m c -> Producer   y m c
(>~) :: Pipe     a y m b -> Pipe     b y m c -> Pipe     a y m c
(>~) and await form a Category where await is the identity.

Pipe

type Pipe a b = Proxy () a () b
     +---------+
     |         |
 () <==       <== ()
     |  Proxy  |
 a  ==>       ==> b
     |         |
     +---------+
(>->) :: Monad m => Producer a m r -> Consumer a m r -> Effect m r
(>->) :: Monad m => Producer a m r -> Pipe   a b m r -> Producer b m r
(>->) :: Monad m => Pipe   a b m r -> Consumer b m r -> Consumer a m r
(>->) :: Monad m => Pipe   a b m r -> Pipe   b c m r -> Pipe   a c m r

cat :: (Monad m) => Pipe a a m r
cat = forever $ do
    x <- await
    yield x
(>→) and cat form a Category where cat is the identity.

Bidirectional API

The response category
yield = respond
for = (//>)
(~>) = (/>/)
The reply category
await = request ()

Lift

StateP

Run StateT in the base monad of the Proxy passed as a second argument.

runStateP
    :: (Monad m)
    => s -- state (usually of type proxy)
    -> Proxy a' a b' b (S.StateT s m) r
    -> Proxy a' a b' b m (r, s)
Example
-- !! this return a Producer a m (Maybe r, Producer a m r) !!
-- This makes sense you are actually running the StateT monad from Producer a (StateT (Producer a m r) m r) r
-- r is either Just which means the original Producer is empty or Nothing which mean you should go on drawing from the original Producer
-- The top producer accumulates your split, then you have a pair of a Maybe r and your original Producer

runStateP p $ do -- p will be used to feed the underlying proxy
    -- entering a monad of the form: (Proxy (<- StateT monad <- Proxy))
    -- All computation happens inside the underlying monad that is initially fed up by the param p
    x <- lift draw -- lift the next value of the underlying proxy
    case x of -- Left if the underlying proxy is empty or Right with the drawn element
        Left  r -> return (Just r)
        Right a -> do
            yield a -- push `a onto the top proxy
            (Just <$> input) >-> (Nothing <$ takeWhile (== a))  -- start streaming values from the underlying proxy
                                                                --

Concurrent API

You have got a mailbox !

(output, input) <- spawn Unbounded
producer >-> (consumer) output >...> input (producer) >-> consumer

Send to the mailbox using toOutput output (output is able to sent mail). So toOutput transforms the output into a consumer. Read from the mailbox using fromInput input (input is able to receive mail). So fromInput transforms the input into a producer.

newtype Input a = Input { recv :: S.STM (Maybe a) }

Pipes-Handle

Pipes-handle models the input/output stream analogy. An output stream accepts bytes (you write into it) whereas you read from an inputstream. The proxy that can "read from" in the pipes ecosystem is the consumer. By analogy, an output stream accepts output bytes and sends them to some sink. So you write into an output stream.

Stream

Pipes-Parse

Parser

Parser is like Consumers but with the ability to keep the leftover

type Parser a m r = forall x . StateT (Producer a m x) m r

draw :: (Monad m) => Parser a m (Maybe a)

runStateT  :: Parser a m r -> Producer a m x -> m (r, Producer a m x)
evalStateT :: Parser a m r -> Producer a m x -> m  r
execStateT :: Parser a m r -> Producer a m x -> m (   Producer a m x)
Lenses

Lenses served as transformation in both directions.

splitAt
    :: Monad m
    => Int
    -> Lens' (Producer a m x) (Producer a m (Producer a m x))
zoom

Connect lenses to Parsers

zoom
    :: Lens' (Producer a m x) (Producer b m y)
    -> Parser b m r
    -> Parser a m r

Iso': don’t provide them if there is error messages involved in encoding and decoding. Stick to Lens'

Pipes-Group

FreeT nests each subsequent Producer within the return value of the previous Producer so that you cannot access the next Producer until you completely drain the current Producer.

split / transform / join paradigm

-- A "splitter" such as `groupBy`, `chunksOf` or `splitOn`
Producer a m ()           -> FreeT (Producer a m) m ()  ~   [a]  -> [[a]]

-- A "transformation" such as `takeFree`
FreeT (Producer a m) m () -> FreeT (Producer a m) m ()  ~  [[a]] -> [[a]]

-- A "joiner" such as `concat` or `intercalate`
FreeT (Producer a m) m () -> Producer a m ()            ~  [[a]] ->  [a]

Errors management

Empty Bytestring

If you want to transform a Producer of ByteString into another Producer, for instance of csv records, be careful to be immune of empty bytestring chunks. Indeed pipes-bytestring operations don’t guarantee that they won’t drop empty bytestring chunks or create new ones.

-- first take the next elem of the source
x <- lift (next source)
        case x of
            Left () -> feedParser (k B.empty) (return ())
            Right (bs, source') ->
                if (B.null bs)
                then continue k source'
                else feedParser (k bs) source'

Managed

You have a resource a that can be acquired and then released.

-- | A @(Managed a)@ is a resource @(a)@ bracketed by acquisition and release
newtype Managed a = Manage
    { -- | Consume a managed resource
      with :: forall x . (a -> IO x) -> IO x
    }
Resource ((forall b. IO b -> IO b) -> IO (Allocated a))

Arrows and push based pipe

Events are discrete ← PUSH based.
Behaviors are continuous ← PULL based

ArrowChoice corresponds to concurrency and Arrow corresponds to parallelism

Controller/Model/View

Controller

Represent concurrent effectful inputs to your system. A controller is really just a synonym for an Input from pipes-concurrency. So you have this function:

producer :: Buffer a -> Producer a IO () -> Managed (Controller a)
Model

A pure streaming transformation from the combined controller to the combined views. You can test this pure kernel by swapping out controllers with predicable inputs.

asPipe :: Pipe a b (State s) () -> Model s a b
View

Handles all effectful outputs from the model.

asSink :: (a -> IO ()) -> View aa
Run it
runMVC ::
  initialState
  -> Model s a b
  -> Managed (View b, Controller a)
  -> IO s

Questions

type Producer b =                    Proxy Void () () b
type Producer' b m r = forall x' x . Proxy x' x () b m r