Pipes
Primitives
newtype StateT s m a = StateT {
runStateT :: s -> m (a, s)
}
data Free f a = Free (f (Free f a)) | Pure a
liftF :: Functor f => f a -> Free f a
is the inhabited type and denote a closed output
Proxy
Pipes defines a single type Proxy
which is a monad transformer:
(Proxy p) => p a' a b' b m r Upstream | Downstream +---------+ | | a' <== <== b' | Proxy | a ==> m ==> b | | +----|----+ r
type Effect = Proxy X () () X
runEffect :: (Monad m) => Effect m r -> m r
Effect is a proxy that never yield or wait. The default API exposes a pull-based unidirectional flow.
Producer
A Producer is a monad transformer that extends any base monad with a yield command. yield
emits a value, suspending the current Producer until the value is consumed. If nobody consumes the value (which is possible) then yield never returns.
type Producer b m r = Proxy X () () b m r +---------+ | | Void <== <== () | Proxy | () ==> ==> b | | +---------+
yield :: (Monad m) => b -> Producer' b m ()
for :: (Monad m)
=> Proxy x' x b' b m a'
-> (b -> Proxy x' x c' c m b')
-> Proxy x' x c' c m a'
-- "into" compose the bodies of `for`
(~>) :: (Monad m)
=> (a -> Producer b m r)
-> (b -> Producer c m r)
-> (a -> Producer c m r)
(f ~> g) x = for (f x) g
~> and yield form a Category ("Generator") where yield is the identity.
|
With for
you consume every element of a Producer
the exact same way. If this is not suitable, use next
or a Consumer
.
Think of next
as pattern matching on the head of the Producer. This Either returns a Left if the Producer is done or it returns a Right containing the next value, a, along with the remainder of the Producer:
next :: Monad m => Producer a m r -> m (Either r (a, Producer a m r))
Consumer
A consumer represents an "exhaustible" (it may refuse to accept new values) and possibly effectful sink of values. An example of an exhaustible sink is toOutput
from pipes-concurrency
, which will terminate if the Output
it writes to has been sealed.
await
blocks waiting for a new value. If nobody provides it (which is possible) then await never returns.
type Consumer a = Proxy () a () X +---------+ | | () <== <== () | Proxy | a ==> ==> Void | | +---------+
await :: Monad m => Consumer' a m a
Repeatedly feeds await
in the consumer with the action passed as the first parameter.
This allows consumer composition
runEffect $ lift getLine >~ stdoutLn
+- Feed +- Consumer to +- Returns new | action | feed | Effect v v v
(>~) :: Effect m b -> Consumer b m c -> Effect m c
(>~) :: Consumer a m b -> Consumer b m c -> Consumer a m c
(>~) :: Producer y m b -> Pipe b y m c -> Producer y m c
(>~) :: Pipe a y m b -> Pipe b y m c -> Pipe a y m c
(>~) and await form a Category where await is the identity.
|
Pipe
type Pipe a b = Proxy () a () b +---------+ | | () <== <== () | Proxy | a ==> ==> b | | +---------+
(>->) :: Monad m => Producer a m r -> Consumer a m r -> Effect m r
(>->) :: Monad m => Producer a m r -> Pipe a b m r -> Producer b m r
(>->) :: Monad m => Pipe a b m r -> Consumer b m r -> Consumer a m r
(>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
cat :: (Monad m) => Pipe a a m r
cat = forever $ do
x <- await
yield x
(>→) and cat form a Category where cat is the identity.
|
Lift
Run StateT
in the base monad of the Proxy passed as a second argument.
runStateP
:: (Monad m)
=> s -- state (usually of type proxy)
-> Proxy a' a b' b (S.StateT s m) r
-> Proxy a' a b' b m (r, s)
-- !! this return a Producer a m (Maybe r, Producer a m r) !!
-- This makes sense you are actually running the StateT monad from Producer a (StateT (Producer a m r) m r) r
-- r is either Just which means the original Producer is empty or Nothing which mean you should go on drawing from the original Producer
-- The top producer accumulates your split, then you have a pair of a Maybe r and your original Producer
runStateP p $ do -- p will be used to feed the underlying proxy
-- entering a monad of the form: (Proxy (<- StateT monad <- Proxy))
-- All computation happens inside the underlying monad that is initially fed up by the param p
x <- lift draw -- lift the next value of the underlying proxy
case x of -- Left if the underlying proxy is empty or Right with the drawn element
Left r -> return (Just r)
Right a -> do
yield a -- push `a onto the top proxy
(Just <$> input) >-> (Nothing <$ takeWhile (== a)) -- start streaming values from the underlying proxy
--
Concurrent API
You have got a mailbox !
(output, input) <- spawn Unbounded
producer >-> (consumer) output >...> input (producer) >-> consumer
Send to the mailbox using toOutput output
(output is able to sent mail). So toOutput
transforms the output into a consumer.
Read from the mailbox using fromInput input
(input is able to receive mail). So fromInput
transforms the input into a producer.
newtype Input a = Input { recv :: S.STM (Maybe a) }
Pipes-Handle
Pipes-handle models the input/output stream analogy. An output stream accepts bytes (you write into it) whereas you read from an inputstream. The proxy that can "read from" in the pipes ecosystem is the consumer. By analogy, an output stream accepts output bytes and sends them to some sink. So you write into an output stream.
Pipes-Parse
Parser is like Consumers but with the ability to keep the leftover
type Parser a m r = forall x . StateT (Producer a m x) m r
draw :: (Monad m) => Parser a m (Maybe a)
runStateT :: Parser a m r -> Producer a m x -> m (r, Producer a m x)
evalStateT :: Parser a m r -> Producer a m x -> m r
execStateT :: Parser a m r -> Producer a m x -> m ( Producer a m x)
Lenses served as transformation in both directions.
splitAt
:: Monad m
=> Int
-> Lens' (Producer a m x) (Producer a m (Producer a m x))
Connect lenses to Parsers
zoom
:: Lens' (Producer a m x) (Producer b m y)
-> Parser b m r
-> Parser a m r
Iso'
: don’t provide them if there is error messages involved in encoding and decoding. Stick to Lens'
Pipes-Group
FreeT nests each subsequent Producer within the return value of the previous Producer so that you cannot access the next Producer until you completely drain the current Producer.
split / transform / join paradigm
-- A "splitter" such as `groupBy`, `chunksOf` or `splitOn`
Producer a m () -> FreeT (Producer a m) m () ~ [a] -> [[a]]
-- A "transformation" such as `takeFree`
FreeT (Producer a m) m () -> FreeT (Producer a m) m () ~ [[a]] -> [[a]]
-- A "joiner" such as `concat` or `intercalate`
FreeT (Producer a m) m () -> Producer a m () ~ [[a]] -> [a]
Errors management
Empty Bytestring
If you want to transform a Producer of ByteString into another Producer, for instance of csv records, be careful to be immune of empty bytestring chunks.
Indeed
|
Managed
You have a resource a that can be acquired and then released.
-- | A @(Managed a)@ is a resource @(a)@ bracketed by acquisition and release
newtype Managed a = Manage
{ -- | Consume a managed resource
with :: forall x . (a -> IO x) -> IO x
}
Resource ((forall b. IO b -> IO b) -> IO (Allocated a))
Arrows and push based pipe
Events are discrete ← PUSH based.
Behaviors are continuous ← PULL based
ArrowChoice
corresponds to concurrency and Arrow
corresponds to parallelism
Controller/Model/View
Represent concurrent effectful inputs to your system. A controller
is really just a synonym for an Input
from pipes-concurrency
. So you have this function:
producer :: Buffer a -> Producer a IO () -> Managed (Controller a)
A pure streaming transformation from the combined controller to the combined views. You can test this pure kernel by swapping out controllers with predicable inputs.
asPipe :: Pipe a b (State s) () -> Model s a b
Handles all effectful outputs from the model.
asSink :: (a -> IO ()) -> View aa
runMVC ::
initialState
-> Model s a b
-> Managed (View b, Controller a)
-> IO s
Questions
type Producer b = Proxy Void () () b
type Producer' b m r = forall x' x . Proxy x' x () b m r