Introducing pulls
Pulls are the plumbing underlying streams. All streams and stream transformations are built using pulls.
Perhaps the best way to understand them is to build a few ourselves.
Outputting values
Pulls can output values. For instance, take a stream which outputs a single "hello"
value.
val helloStream = Stream("hello")
We could use the output1
function on the
fs2.Pull
object to build a pull that does the same.
val helloPull = Pull.output1("hello")
We can then convert it into a stream using the stream
function.
helloPull.stream
Pulls are Streams
The helloPull
produces the same stream as helloStream
. We can run them both to check:
helloStream.compile.toList // res8: List[String] = List("hello") helloPull.stream.compile.toList // res9: List[String] = List("hello")
Empty streams
The simplest pull is the one that does nothing.
Pull.done.stream
This is equivalent to Stream.empty
.
Streams are pulls
We can convert from a stream to a pull using a variety of
functions. These are conveniently accessed under the
pull
function.
For instance, echo
converts a stream to its corresponding pull representation:
helloStream.pull.echo
And of course, we can convert back and forth.
helloStream.pull.echo.stream.compile.toList // res12: List[String] = List("hello") helloStream.pull.echo.stream.pull.echo.stream.compile.toList // res13: List[String] = List("hello") helloStream.compile.toList // res14: List[String] = List("hello")
Composing pulls
We can output multiple elements by composing pulls together using the
>>
function, spoken as ‘then’.
Our helloPull
is sorely missing a
"world"
. Let’s use >>
to append it:
val helloWorldPull = helloPull >> Pull.output1("world")
Which is the equivalent of helloStream ++ Stream("world")
.
helloWorldPull.stream.toList // res15: List[String] = List("hello", "world") (helloStream ++ Stream("world")).toList // res16: List[String] = List("hello", "world")
This gives us a clue as to how to write stream transformations. With a bit of thought we can code this up as a pipe.
def world: Pipe[Pure, String, String] = in => (in.pull.echo >> Pull.output1("world")).stream
Our mental picture of a pull is a bit like this:
Food for thought
These examples might seem simple, but they raise a lot of questions.
If pulls are just streams, why have them at all? Why not just have a single concept to represent the two?
We can build and compose streams using pulls, but how do we modify them? We can’t write
map
ortake
using the pulls we’ve seen so far.That
>>
function hints that pulls are monads. Are they? If so, is the monad the same as the one forStream
?
We can tackle all of these by looking at a pull’s type parameters.
The result type
The Stream[F, O]
datatype has two type parameters: an
effect F
and an output type O
. On the
other hand Pull[F, O, R]
has three. What is that extra
R
type for?
Unlike streams, pulls have a result.
The pulls you’ve seen so far all have a result type of
Unit
. To demonstrate, let’s examine the type of the helloPull
:
helloPull // res17: Pull[Nothing, String, Unit] = Output(values = Chunk(hello))
It has an output type of String
, as we expect, and a
result of Unit
.
We can build a pull with a more useful result using Pull.pure
:
val helloResult = Pull.pure("hello") // helloResult: Pull[Nothing, Nothing, String] = Succeeded(r = "hello")
This has an output type of INothing
, meaning it doesn’t output any
values, and a result of String
.
To emphasise, the helloResult
pull doesn’t output "hello"
when run. In
fact if we try and convert it to a stream, we’ll find we can’t.
helloResult.stream // error: // value stream is not a member of fs2.Pull[Nothing, fs2.INothing, String]
A pull can only be represented as a stream if it has a result of
Unit
— meaning its result can be discarded.
This brings us back to our original question: what is the result for?
Enter the Monad
Pulls are indeed monads. For me and many functional programming
enthusiasts this is a subject for profound thought. But for our
current purposes, this means they have a flatMap
function. It looks a bit like this:
class Pull[F, O, R] { def flatMap[R1](f: R => Pull[F, O, R1]): Pull[F, O, R1] = ??? }
The function f
passed to flatMap
uses the result to create another pull.
We could pass in a function that creates a pull outputting the result.
val outputHello = helloResult.flatMap { (text: String) => Pull.output1(text) } // outputHello: Pull[[x >: Nothing <: Any] => Nothing, String, Unit] = <function1>
The outputHello
pull has a result type of Unit
,
so can be converted to a stream.
outputHello.stream.compile.toList // res18: List[String] = List("hello")
You can think of the result as a temporary value that can be used to construct other pulls.
Modifying streams
We can convert a stream into a variety of pulls with meaningful results.
The most useful pull is one created by uncons1
.
This creates a pull with a result of the first element outputted by the stream and
another stream of the remaining elements.
helloStream.pull.uncons1 // res19: Pull[Nothing, Nothing, Option[Tuple2[String, Stream[Nothing, String]]]] = <function1>
Let’s break down that hefty type signature:
The output type is
INothing
, meaning this pull doesn’t output values. Unlikeecho
, which would output all values in the original stream, no values are outputted.The result is a mouthful of
Option[(String, Stream[Nothing, String])]
. It may contain a single value — the first value in the stream — and the remaining stream. If there are no more values to emit, the result isNone
. We’ll expand on this in just a moment.
Let’s try and use uncons1
to write some more interesting stream
transformations. Suppose we want to write a take1
pipe to take the first element in a stream. We could start by using
uncons1
and working with the result:
def take1: Pipe[Pure, String, String] = { in => in.pull.uncons1.flatMap { ??? } }
The result is one of two things:
If the stream is non-empty, the result is a
Some
of a tuple of the first element and the rest of the stream. The first element is usually referred to as the “head” of the stream, so we’ll name thish
. If the head is present we output it usingPull.output1
.If the stream is empty, the result is a
None
. We terminate withPull.done
.
def take1: Pipe[Pure, String, String] = { in => in.pull.uncons1.flatMap { case Some((h, _)) => Pull.output1(h) // Ⓐ case None => Pull.done // Ⓑ }.stream }
Let’s try it out:
helloStream.through(take1).compile.toList // res20: List[String] = List("hello") Stream.empty.through(take1).compile.toList // res21: List[String] = List() Stream("hello", "world").through(take1).compile.toList // res22: List[String] = List("hello")
Updating our mental picture with results and uncons1
we have:
If you’re still a bit suspicious of uncons1
it might help to
compare it to the act of pattern matching on a
List
. Pattern matching on a list has a non‑empty case
::
, which gives the first element and the rest of
the list; and an empty case Nil
. Gavin Bisesi’s excellent gist on pulls goes into this in detail.
Recursion
If you’re mathematically inclined, you’ll be bursting to write a recursive
takeN
function that takes a given number of elements. Go for it!
If you aren’t too keen on the idea, fear not. fs2 has another handy
pull that does this for us — pull.take
.
helloStream.pull.take(1) // res23: Pull[Nothing, String, Option[Stream[Nothing, String]]] = <function1>
Take a moment to examine that type signature.
It gives us a pull with an output type of
String
: it outputs the number of elements we ask it to take.More interestingly, it results in an
Option[Stream[Nothing, String]]
. This represents the rest of the stream, which we can use to manipulate the remaining output.
Which is exactly what we need to write our serveThen
pipe.
Serving jiaozi
The serveThen
pipe should take a given number of
jiaozi from the stream and store the rest.
def serveThen(n: Int, store: Pipe[IO, Jiaozi, Nothing] ): Pipe[IO, Jiaozi, Jiaozi] = { in => in.pull.take(n).flatMap { case Some(rest) => ??? case None => Pull.done }.stream }
If there are no elements left then we have nothing to do — we can
terminate with Pull.done
. On the other hand, if there
are still jiaozi left over, we want to pass them through the store
pipe.
def serveThen(n: Int, store: Pipe[IO, Jiaozi, Nothing] ): Pipe[IO, Jiaozi, Jiaozi] = { in => in.pull.take(n).flatMap { case Some(rest) => rest.through(store) ??? case None => Pull.done }.stream }
We then compose the resulting stream with the rest of the code by converting it back to a pull.
def serveThen(n: Int, store: Pipe[IO, Jiaozi, Nothing] ): Pipe[IO, Jiaozi, Jiaozi] = { in => in.pull.take(n).flatMap { case Some(rest) => rest.through(store).pull.echo case None => Pull.done }.stream }
And there we have it. Our pipe is complete.
Let’s try it out. As a reminder here’s our final sim
function.
def sim(numberOfRolls: Int, jiaoziToServe: Int): IO[(Bowl, Leftovers)] = { for { box <- emptyBox bowl <- roll(numberOfRolls) .through(cook) .through(serveThen(jiaoziToServe, store(box))) .compile .toList leftovers <- box.get } yield (bowl, leftovers) }
If we give it a spin we do indeed see leftovers.
val (bowl, leftovers) = sim(2, 4).unsafeRunSync() // bowl: List[Jiaozi] = List(0, 1, 2, 3) // leftovers: List[Jiaozi] = List(5, 4)