Mis en place
Our simulation has four parts: rolling, cooking, serving and storing.
We’re going to give it a couple of parameters: the number of rolls of dough to make and the number of jiaozi to serve.
It should output a serving bowl of jiaozi and the leftovers.
def sim(numberOfRolls: Int, jiaoziToServe: Int): (Bowl, Leftovers) = ???
To keep calculations simple, we’ll assume that exactly three jiaozi can be made from each roll.
If we make two rolls and serve four jiaozi, we should have a couple of jiaozi left over.
The data model
Let’s define some datatypes! The dough and jiaozi don’t really have any properties, so can be modeled simply as integer ids.
type Dough = Int type Jiaozi = Int type Bowl = List[Jiaozi] type Leftovers = List[Jiaozi]
The rolling stage can be coded as a stream of dough. Cooking and serving are both transformations on that dough, so can be coded as pipes.
def roll(rollsToMake: Int): Stream[Pure, Dough] = ??? val cook: Pipe[Pure, Dough, Jiaozi] = ??? def serve(jiaoziToServe: Int): Pipe[Pure, Jiaozi, Jiaozi] = ???
Storing is a bit more tricky — we’ll get to it, but first let’s warm up by filling in these question marks. They can all be implemented using existing functions on streams.
Rolling
Rolling should produce a stream of dough with incremental ids,
limited to the number of rolls to make. We
can implement roll
using any number of functions on the
Stream
datatype. Let’s go with iterate
and
take
.
def roll(rollsToMake: Int): Stream[Pure, Dough] = Stream.iterate(0)(_ + 1).take(rollsToMake)
We can check it works by running the stream:
roll(2).compile.toList // res0: List[Dough] = List(0, 1)
Cooking
We next want to split each roll of dough into three jiaozi, each with a unique integer id. Since we’re using integers to model both the dough and jiaozi, this involves some basic arithmetic.
val cook: Pipe[Pure, Dough, Jiaozi] = _.flatMap { dough => Stream( dough * 3, dough * 3 + 1, dough * 3 + 2 ) }
We should get three jiaozi per roll of dough, each with its own unique id. Let’s check:
roll(2).through(cook).compile.toList // res1: List[Int] = List(0, 1, 2, 3, 4, 5)
Serving
To serve, we take a specified number of jiaozi from the stream. As
a first shot we can try using the take
function.
def serve(jiaoziToServe: Int): Pipe[Pure, Jiaozi, Jiaozi] = _.take(jiaoziToServe)
Let’s give it a spin:
roll(2).through(cook).through(serve(4)).compile.toList // res2: List[Int] = List(0, 1, 2, 3)
Storing
Finally, we want to store the leftover jiaozi.
This can be coded in various
ways. We’re going to go for using a mutable store with a Cats Effect
Ref
.
type Box = Ref[IO, Leftovers]
A brief rundown of Ref
For our purposes, a Ref
is a functional way of
working with mutable state. You can think of it as a box containing a
value. The value can be modified with an effect, which in our case is
the Cats Effect IO
.
We can create a box with no leftovers using Ref.of
val emptyBox: IO[Box] = Ref.of(Nil) val box: Box = emptyBox.unsafeRunSync()
We can update it using the aptly named update
function. For instance, to add the fourth jiaozi to the box:
box.update(leftovers => 3 :: leftovers).unsafeRunSync()
Finally, we can extract the leftovers using get
box.get.unsafeRunSync() // res4: List[Jiaozi] = List(3)
A store
pipe should add the remaining leftovers to a
box. We can check the box once we’ve finished running the stream.
def store(box: Box): Pipe[IO, Jiaozi, Nothing] = ???
Let’s take a closer look at that signature:
The pipe has an
IO
effect. This is because adding jiaozi to the box an effectful operation.The pipe outputs
Nothing
. Since all jiaozi are added to the box, there will never be values flowing out of the pipe.
We can implement this using evalMap
function,
removing all outputs with drain
.
def store(box: Box): Pipe[IO, Jiaozi, Nothing] = _.evalMap(jiaozi => box.update(jiaozi :: _)) .drain
Let’s try it out:
{ for { box <- emptyBox _ <- Stream(1, 2, 3).through(store(box)).compile.drain leftovers <- box.get } yield leftovers }.unsafeRunSync() // res5: List[Jiaozi] = List(3, 2, 1)
Which is just what we want.
Putting it together
Our entire sim
function will also be effectful. We’ll
change the signature to reflect that.
def sim(numberOfRolls: Int, jiaoziToServe: Int): IO[(Bowl, Leftovers)] = ???
Finally, let’s try and write it:
def sim(numberOfRolls: Int, jiaoziToServe: Int): IO[(Bowl, Leftovers)] = { for { box <- emptyBox bowl = roll(numberOfRolls) .through(cook) .through(serve(jiaoziToServe)) .compile .toList leftovers <- box.get } yield (bowl, leftovers) }
Does that work?
sim(2, 4).unsafeRunSync() // res6: Tuple2[Bowl, Leftovers] = (List(0, 1, 2, 3), List())
Unfortunately not — there are no leftovers.
The keen eyed will notice that we’re not in fact storing any
jiaozi. We haven’t plumbed our store
pipe into the rest
of the code.
There’s no easy way of doing this.
If you’re skeptical, have a go yourself. Can you make use of
store
without drastically changing the pipe signatures?
If not, why?
The crux of the problem
The problem lies within our serve
pipe. We’re using
take
to serve the jiaozi.
take
outputs a specified number of
elements from a stream, but discards the rest. For our purposes, we do
want to output elements, but we want to send the remaining jiaozi through
a different store
pipe. We need a function that looks like this:
def serveThen(n: Int, store: Pipe[IO, Jiaozi, Nothing] ): Pipe[IO, Jiaozi, Jiaozi] = ???
Let’s think a bit about how serveThen
should behave.
serveThen
should take a number of jiaozi from the
stream, just as take
, but should send the remaining
jiaozi down the store
pipe.
We can string it into the rest of our sim
function as
follows:
def sim(numberOfRolls: Int, jiaoziToServe: Int): IO[(Bowl, Leftovers)] = { for { box <- emptyBox bowl <- roll(numberOfRolls) .through(cook) .through(serveThen(jiaoziToServe, store(box))) .compile .toList leftovers <- box.get } yield (bowl, leftovers) }
To write it, we’ll need to learn about fs2’s primitives. Read on to cook up some pulls.