Taking the plunge

Zainab Ali

https://kebab-ca.se/presentations.html

What are streams?

Things which flow

What are streams for?

Agenda

  • Examples: events and data

  • More examples: databases and webservers

  • Even more examples: heartbeats and tests

Events

  • Messages in a kafka topic

  • Key presses and mouse clicks in a game

fs2.io.stdinUtf8(bufferSize)
  .map(text => s"🐈 $text")
  .through(fs2.io.stdoutLines())
  .compile.drain

An intuition

Lines β€œflow” through the stream

The stage model

  • A stream is composed of stages

  • A stage pulls on its upstream stage

  • The upstream stage may output an element

  • The stage may output an element downstream

fs2.io.stdinUtf8(bufferSize)     // stdin stage
  .map(line => s"🐈 $line")      // map stage
  .through(fs2.io.stdoutLines()) // stdout stage
  .compile.drain

What are streams for?

Events

Data

input.txt

// My cats
Mao
// My friends’s cats
Popcorn

output.txt

🐈 Mao
🐈 Popcorn
Files[IO]
  .readUtf8Lines(Path("input.txt"))
  .filter(s => !s.startsWith("//"))
  .map(line => s"🐈 $line")
  .intersperse("\n")
  .through(text.utf8.encode)
  .through(Files[IO].writeAll(Path("output.txt")))
Files[IO]                                        // upstream
  .readUtf8Lines(Path("input.txt"))
  .filter(s => !s.startsWith("//"))
  .map(line => s"🐈 $line")                      // map
  .intersperse("\n")                             // downstream
  .through(text.utf8.encode)
  .through(Files[IO].writeAll(Path("output.txt")))

What are streams for?

Data

Events and data

  • Infinite loops

  • Incremental pipelines

  • Iterating over elements

Databases

create table catnames (name varchar)

table contents

"Mao"
"Popcorn"

console

🐈 Mao
🐈 Popcorn
val catNames = sql"select name from catnames"
            .query[String]
            .stream
            .transact(xa)
catNames
   .map(text => s"🐈 $text\n")
   .through(fs2.io.stdoutLines())
   .compile
   .drain

Webserver

console

curl http://localhost:8080/Mao
🐈 Mao
curl http://localhost:8080/Popcorn
🐈 Popcorn
val service = HttpRoutes.of[IO] {
  case GET -> Root / name => Ok(s"🐈 $name")
}.orNotFound
EmberServerBuilder
    .default[IO]
    .withHttpApp(service)
    .build
    .useForever
Stream.repeatEval(readNextRequest)       // read
    .parEvalMapUnbounded(service.handle) // service
    .evalMap(writeResponse)              // write
    .compile
    .drain

What are streams for?

Concurrency

Databases and webservers

  • Effects

  • Concurrency

  • Resource management

Heartbeat

console

🐈
// 1 second passes
🐈
// 1 second passes
🐈
// 1 second passes
Stream.repeatEval(IO.println("🐈"))
      .spaced(1.second)
      .compile
      .drain

What are streams for?

Time

Concurrent testing

def addCat(line: String): IO[String] = ???
test("Add 🐈 to Mao") {
  for {
    line <- addCat("Mao")
  } yield expect.eql("🐈 Mao", line)
}

test("Ignore empty lines") {
  for {
    line <- addCat("")
  } yield expect.eql("", line)
}

console

+ Add 🐈 to Mao
- Ignore empty lines
FAILURES
- Ignore empty lines
case class Test(name: String, task: IO[Boolean])
case class TestResult(name: String, succeeded: Boolean)
def runTest(test: Test): IO[TestResult] = {
      for {
        succeeded <- test.task
        prefix = if (succeeded) "+" else "-"
        _ <- IO.println(s"$prefix ${test.name}")
      } yield TestResult(test.name, succeeded)
    }
Stream.emits(tests)
  .parEvalMapUnbounded(runTest)
  .compile
  .drain
val results: List[TestResult] = ...

Stream
   .emits(results)
   .filterNot(_.succeeded)
   .evalMap(result => IO.println(s"- ${result.name}"))
   .compile
   .drain

What are streams for?

Structuring effects

What are streams for?

Control flow

Where next?

Thank you!

50% off with code LSUG