Taking the plunge

Zainab Ali

https://kebab-ca.se/presentations.html

What are streams?

Things which flow

What are streams for?

Agenda

  • Examples: events and data

  • More examples: databases and webservers

  • Even more examples: heartbeats and tests

Events

  • Messages in a kafka topic

  • Key presses and mouse clicks in a game

fs2.io.stdinUtf8(bufferSize)
  .map(text => s"🐈 $text")
  .through(fs2.io.stdoutLines())
  .compile.drain

An intuition

Lines β€œflow” through the stream

The stage model

  • A stream is composed of stages

  • A stage pulls on its upstream stage

  • The upstream stage may output an element

  • The stage may output an element downstream

fs2.io.stdinUtf8(bufferSize)     // stdin stage
  .map(line => s"🐈 $line")      // map stage
  .through(fs2.io.stdoutLines()) // stdout stage
  .compile.drain

What are streams for?

Events

Data

input.txt

// My cats
Mao
// My friends’s cats
Popcorn

output.txt

🐈 Mao
🐈 Popcorn
Files[IO]
  .readUtf8Lines(Path("input.txt"))
  .filter(s => !s.startsWith("//"))
  .map(line => s"🐈 $line")
  .intersperse("\n")
  .through(text.utf8.encode)
  .through(Files[IO].writeAll(Path("output.txt")))
Files[IO]                                        // upstream
  .readUtf8Lines(Path("input.txt"))
  .filter(s => !s.startsWith("//"))
  .map(line => s"🐈 $line")                      // map
  .intersperse("\n")                             // downstream
  .through(text.utf8.encode)
  .through(Files[IO].writeAll(Path("output.txt")))

What are streams for?

Data

Events and data

  • Infinite loops

  • Incremental pipelines

  • Iterating over elements

Databases

create table catnames (name varchar)

table contents

"Mao"
"Popcorn"

console

🐈 Mao
🐈 Popcorn
val catNames = sql"select name from catnames"
            .query[String]
            .stream
            .transact(xa)
catNames
   .map(text => s"🐈 $text\n")
   .through(fs2.io.stdoutLines())
   .compile
   .drain

Webserver

console

curl http://localhost:8080/Mao
🐈 Mao
curl http://localhost:8080/Popcorn
🐈 Popcorn
val service = HttpRoutes.of[IO] {
  case GET -> Root / name => Ok(s"🐈 $name")
}.orNotFound
EmberServerBuilder
    .default[IO]
    .withHttpApp(service)
    .build
    .useForever
Stream.repeatEval(readNextRequest)       // read
    .parEvalMapUnbounded(service.handle) // service
    .evalMap(writeResponse)              // write
    .compile
    .drain

What are streams for?

Concurrency

Databases and webservers

  • Effects

  • Concurrency

  • Resource management

Heartbeat

console

🐈
// 1 second passes
🐈
// 1 second passes
🐈
// 1 second passes
Stream.repeatEval(IO.println("🐈"))
      .spaced(1.second)
      .compile
      .drain

What are streams for?

Time

Concurrent testing

def addCat(line: String): IO[String] = ???
test("Add 🐈 to Mao") {
  for {
    _    <- log.info("Adding 🐈 to Mao")
    line <- addCat("Mao")
    _    <- log.info("Added 🐈 to Mao")
  } yield expect.eql("🐈 Mao", line)
}

test("Ignore empty lines") {
  for {
     _   <- log.info("Adding 🐈 to empty line")
    line <- addCat("")
     _   <- log.info("Added 🐈 to empty line")
  } yield expect.eql("", line)
}

console

+ Add 🐈 to Mao
- Ignore empty lines
  Adding 🐈 to empty line
  Added 🐈 to empty line
Stream.emits(tests)
  .parEvalMapUnbounded(_.task)
  .evalMap(printLogs)
  .compile
  .drain
case class Test(name: String, task: IO[TestResult])

case class TestResult(name: String,
                      succeeded: Boolean,
                      logs: List[String])
def printLogs(result: TestResult): IO[Unit] = {
  val prefix = if (result.succeeded) "+" else "-"
  for {
    _ <- IO.println(s"$prefix ${result.name}")
    _ <- Stream.emits(result.logs)
           .evalMap(IO.println)
           .compile
           .drain
           .unlessA(result.succeeded)
  } yield ()
}

What are streams for?

Control flow

Where next?

Thank you!

50% off with code SCALADAYS