Skip to main content

Taking the plunge: a deep dive into streaming with fs2

 30 minutes

This is a written version of my “Taking the plunge” talk, presented at Scala Days 2025. You can find the slides here.

What are streams?

The term is heavily overloaded in computing. We tend to think of streams as things which flow. In data processing, we think of streams of data flowing through an algorithm. In event-based systems, we think of a stream of events that we must handle, sometimes on-demand.

There are streams in the real world. If you watched this post’s corresponding presentation at Scala Days 2025, you could describe the process of coming into the conference room and sitting in your seat as a streaming problem. When you queue for coffee, along with many other conference attendees, you are part of a stream. As you’re now reading this blog post, you might check the time on your laptop, phone or watch. You might do this more often if you find the post boring, or are getting increasingly tired. That could also be described in a stream.

We say “could be described”, because it doesn’t have to be. A stream is one of many approaches of describing a problem.

When programming, we’d like to describe problems in the simplest way: a way that’s not only concise, but easy to understand.

This gives rise to another question:

What are streams for?

Streams are an excellent fit for some problems, but not for others. What makes streams a good fit?

To answer this question, we’ll think of streams differently. Instead of tools for event handling, or data processing, we’ll think of them as a method of structuring programs. In particular, they’re a method of structuring iterative programs.

Agenda

In this post, we’ll explore many examples of streaming problems. We’ll “stream” examples, if you will.

For each example, we’ll explore why streams are a good fit. Finally, we’ll combine all of these explorations to get to the core of what streams are for.

We’ll start with conventional examples of streams in the field of data processing and event handling, and develop a model of how these streams behave. We’ll then apply that model to solve other kinds of problems.

By the end, you’ll have a good understanding of streams, and be able to figure out whether streams are a good fit for your problem.

Events

First, let’s look at how they’re used in events.

Broadly speaking, an event is just something that happens. You don’t know when it will happen, or if it will ever occur, but when it does, you need to handle it. In event-driven architectures, you might subscribe to messages in a kafka topic. Those messages can be thought of as events. In a game, you might receive user input, key presses and mouse clicks, that cause you to update what’s drawn on the screen.

Let’s take a look at a really basic event-handling program: an echo program.

Our program is a terminal application. We type something into the prompt, and it prints it back to us preceded by a cat emoji.

Before diving into streams, let’s consider different ways of solving this problem.

We could use a loop, such as while. This would work, but wouldn’t scale well with complexity: loops may be good for small algorithms, but they are notoriously difficult to write for larger, IO-based programs.

We could go for a functional approach and write a tail recursive function. This might be fun to write and a little easier to test, but even if you love writing recursive functions, reasoning about those recursive functions can be challenging.

Loops and recursive functions both come with their own problems. Let’s see how a stream fares.

fs2.io.stdinUtf8(bufferSize)
  .map(text => s"🐈 $text")
  .through(fs2.io.stdoutLines())
  .compile.drain

This stream is written using fs2, a functional streaming library in the Typelevel ecosystem. It gives us a Stream datatype, and many operators that work on it. Our stream turns into an IO, which we can run. If you’re familiar with IO, you’ll know how to run this code in an IOApp.

An intuition

How should we think about our stream?

At a high level, there are elements that “flow” through it: lines that get read from the input and somehow travel to the output.

Thinking this way gives us a broad picture of what our stream does, but it isn’t great for troubleshooting. When something in our stream goes wrong, and we’re trying to find the cause of the bug, we need to be able to reason through our program more concretely. We need a better model of how our stream runs.

We can reason through fs2 streams using the stage model.

The stage model

In the stage model we think of each operator as a stage. When the stream runs, the downstream stage pulls on the stage above. Pulling is a bit like requesting an element. If a stage receives an element, it may do some calculations, evaluate some side effects, and may output an element. Or it may indicate that it is done: there are no more elements to output.

We can consider our echo program to be composed of three stages: a stdin stage, a map stage and a stdout stage.

fs2.io.stdinUtf8(bufferSize)     // stdin stage
  .map(line => s"🐈 $line")      // map stage
  .through(fs2.io.stdoutLines()) // stdout stage
  .compile.drain

The final stdout stage pulls on the map stage. This pulls on stdin stage, which evaluates a read effect and eventually outputs a line. The map stage outputs a line with an emoji. The final stdout stage writes this to the console, and pulls once again.

We can express this much more clearly in a diagram. Read it from bottom to top and left to right. An upwards arrow is a pull, and a downwards arrow is an output.

This mental model is very important. It lets us predict the behaviour of our code. When something in our program goes wrong, we can walk through the steps of execution and identify the bug.

What are streams for?

Streams allow us to describe event-handling in a different way. They allow us to define the sequence of operations for handling an event, without worrying about when the event happens.

We can predict the behaviour of our system when there are too many events. Our stream pulls only as fast as events can be processed, so events must be stored in a buffer upstream before they are pulled. Events will never be mysteriously dropped from within our streaming system.

Data

How might streams apply to data processing?

Instead of reading our cat names from stdin, let’s suppose that they are read from a file. The file contains the name of my cat, and the name of my friends’s cats. It might look something like this:

input.txt

// My cats
Mao
// My friends’s cats
Popcorn

I have many friends, and they have many cats, so this file is far too large to fit in memory.

We wish to skip comments, and output each cat name prepended by a cat emoji:

output.txt

🐈 Mao
🐈 Popcorn

We can do this using a stream.

Files[IO]
  .readUtf8Lines(Path("input.txt"))
  .filter(s => !s.startsWith("//"))
  .map(line => s"🐈 $line")
  .intersperse("\n")
  .through(text.utf8.encode)
  .through(Files[IO].writeAll(Path("output.txt")))

We can understand our program at a high level just by reading the code. We read lines from a file, filter out commented lines, then prepend a cat emoji to each line, add a newline character between them, encode them as UTF-8 and finally write them to an output file.

We can also get a detailed mental picture about how our stream evaluates by thinking of it in three stages. The upstream stage reads from the file, the map stage prepends the cat emoji, and the downstream stage writes to the output file.

Files[IO]                                        // upstream
  .readUtf8Lines(Path("input.txt"))
  .filter(s => !s.startsWith("//"))
  .map(line => s"🐈 $line")                      // map
  .intersperse("\n")                             // downstream
  .through(text.utf8.encode)
  .through(Files[IO].writeAll(Path("output.txt")))

This diagram looks surprisingly similar to that for our terminal application. The only difference is that, unlike our terminal application, our file processing program terminates.

What are streams for?

Streams are a great fit for data processing because they allow us to define incremental pipelines. These pipelines operate on chunks of data that are small enough to fit in memory. Under the hood, they behave in exactly the same way as event processing pipelines.

Events and data

Our goal is to find out what problems streams are a good fit for. We’ve seen that streams are good for event-handling, when reading input from the terminal, and data-processing, when reading lines from a file. What links these two problems?

Perhaps it’s termination. In the event-handling case, our program never terminated. It would keep running, even if we never inputted anything. In the data case, our program terminated eventually, but after fully reading the file.

Or perhaps it’s iteration. Both problems iterate over elements and evaluate side effects in the process. Iteration gives us an idea of where next to apply streams.

Databases

Let’s take a look at some other examples of iteration over elements. What about results returned by a database query?

Suppose we have a table of cat names.

create table catnames (name varchar)

table contents

"Mao"
"Popcorn"

We wish to read the names in the table and print them to our console prepended by a cat emoji.

console

🐈 Mao
🐈 Popcorn

Assuming our table is too large to fit in memory, we’ll need to stream the cat names. Here’s how we might construct a stream with doobie, a database library in the Typelevel ecosystem.

val catNames = sql"select name from catnames"
            .query[String]
            .stream
            .transact(xa)

The value catNames is an fs2 stream. It can be composed with other operators to build a bigger program.

catNames
   .map(text => s"🐈 $text\n")
   .through(fs2.io.stdoutLines())
   .compile
   .drain

When we read from a database, we’re solving a problem that isn’t clearly a data or event problem. Our query returns a large volume of data, and so can be thought of as a data processing problem, but if our database is very slow, we might think of the query result as an event.

Regardless of how we think about it, both data processing and event-handling scenarios look the same through the lens of the stage model.

Webserver

A webserver is another example of iterating over elements.

Suppose we have a service that prepends a cat emoji onto the name we give it.

console

curl http://localhost:8080/Mao
🐈 Mao
curl http://localhost:8080/Popcorn
🐈 Popcorn

We can write this service with http4s, a HTTP library in the Typelevel ecosystem.

val service = HttpRoutes.of[IO] {
  case GET -> Root / name => Ok(s"🐈 $name")
}.orNotFound
EmberServerBuilder
    .default[IO]
    .withHttpApp(service)
    .build
    .useForever

You may be wondering: where is the stream?

Under the hood, http4s uses fs2 to construct a stream of requests. It looks something like this:

Stream.repeatEval(readNextRequest)       // read
    .parEvalMapUnbounded(service.handle) // service
    .evalMap(writeResponse)              // write
    .compile
    .drain

Our stream reads requests, handles them, then writes responses.

It looks similar to the streams we’ve seen so far, but there’s a key difference: our service handles multiple requests at a time. In other words, it handles requests concurrently.

This is clearer to see in a diagram.

There are several requests being handled at the same time. We can think of this as several “threads of execution”:

  • The service stage pulls on the read stage sequentially.

  • The write stage pulls on the service stage sequentially.

  • The service stage handles requests concurrently.

What are streams for?

Concurrency problems are notoriously difficult to write and troubleshoot. With fs2 streams however, we’re able to add concurrency simply by using the parEvalMapUnbounded operator. More importantly, we can reason through our concurrent program by thinking of it as several different sequences of pulls and outputs.

Databases and webservers

In all of our examples, we’ve iterated over elements. In our terminal application, the elements were lines inputted by the user. In our data processing pipeline, they were lines read from a file. In our database problem, they were cat names resulting from a query, and in our webserver they were cat names extracted from requests.

But streams can apply to problems where the element of iteration is not important.

Heartbeat

As an example, let’s consider a heartbeat process. Our hearts beat for cats, so our program will print a cat emoji to the console each second.

console

🐈
// 1 second passes
🐈
// 1 second passes
🐈
// 1 second passes

We can describe this as a stream.

Stream.repeatEval(IO.println("🐈"))
      .spaced(1.second)
      .compile
      .drain

The “element” that flows through this stream is the result of IO.println, which is in fact just the unit value ().

Here, we’re using a stream to structure side effects. The side effect in particular is a time delay.

Just as with our other stream programs, we can reason about it in terms of the stage model. In our diagram, the side effect evaluation is shown in a box in the bottom row.

Even though our heartbeat problem isn’t typically thought of as a stream processing pipeline, it can be easily expressed using streams.

Concurrent testing

We can also use streams to structure complex concurrent programs.

For example, suppose we wanted to test an addCat function. Our function should prepend a cat emoji to a line, but leave empty lines unchanged.

def addCat(line: String): IO[String] = ???

addCat takes a long time to run, so our tests that call it should run concurrently.

We can achieve this using weaver, a testing framework in the Typelevel ecosystem. Our tests might look like this:

test("Add 🐈 to Mao") {
  for {
    _    <- log.info("Adding 🐈 to Mao")
    line <- addCat("Mao")
    _    <- log.info("Added 🐈 to Mao")
  } yield expect.eql("🐈 Mao", line)
}

test("Ignore empty lines") {
  for {
     _   <- log.info("Adding 🐈 to empty line")
    line <- addCat("")
     _   <- log.info("Added 🐈 to empty line")
  } yield expect.eql("", line)
}

Troubleshooting concurrent tests can be tricky. Suppose the tests “Add 🐈 to Mao” and “Ignore empty lines” are run at the same time. Their log messages would usually be outputted to the console at the same time, and so be confusing to read.

Troubleshooting would be easier if each test was printed to the console along with its own log output:

console

+ Add 🐈 to Mao
- Ignore empty lines
  Adding 🐈 to empty line
  Added 🐈 to empty line

weaver does exactly that. It runs tests concurrently, but prints their output sequentially.

We can do this in fs2 with just three operators:

Stream.emits(tests)
  .parEvalMapUnbounded(_.task)
  .evalMap(printLogs)
  .compile
  .drain

In more depth, we can model a Test as a side-effecting program, or IO, that results in a TestResult with aggregated logs.

case class Test(name: String, task: IO[TestResult])

case class TestResult(name: String,
                      succeeded: Boolean,
                      logs: List[String])

If we wanted, we could implement our printLogs function using a stream.

def printLogs(result: TestResult): IO[Unit] = {
  val prefix = if (result.succeeded) "+" else "-"
  for {
    _ <- IO.println(s"$prefix ${result.name}")
    _ <- Stream.emits(result.logs)
           .evalMap(IO.println)
           .compile
           .drain
           .unlessA(result.succeeded)
  } yield ()
}

What are streams for?

We’ve seen that streams are a good fit for event handling and data processing problems. They’re used when working with user input, files, databases and webservers.

They iterate over elements, which could be events, or chunks of data, but could also be more abstract tasks, or even just unit values.

We’ve seen that they can be used to structure more general side-effecting programs, such as our heartbeat program, and concurrent programs such as our tests.

At their core, streams are a method of describing control flow. In other words, they’re a mechanism for structuring programs.

They’re a good fit for programs with iteration, side effects, concurrency and time. But they can be applied to any problem that lends itself to a pull-based approach.

Where next?

I’ve only shown you a small fragment of what fs2 streams can do, but I hope you’ve got a taste of how useful they can be.

Next time you encounter a complex problem — one that might involve recursion, loops, side effects or concurrency — try and solve it with a stream. You’ll be surprised at what interesting applications they can have.

You can learn more about control flow with from Fabio Labella’s excellent presentation on Declarative Contol flow with fs2.

If you’re interested in the diagrams, check out the aquascape project, or join me at Lambda World 2025.

If you’d like to learn more about streams, have a read of my book on Functional Stream Processing in Scala.