Aquascape

Zainab Ali

https://kebab-ca.se/presentations.html

fs2

Operators

repeat

infinity

evalMap

side effects

merge

concurrency

def eat(cat: String): IO[String] =
  IO.println(s"$cat eats.").as(cat)

def nap(cat: String): IO[Unit] =
  IO.println(s"$cat naps.")
Stream("Mao", "Popcorn")
  .evalMap(eat)
  .evalMap(nap)
  .repeat
Stream("Mao", "Popcorn")
  .evalMap(eat)
  .evalMap(nap)
  .repeat

1

Mao naps.
Mao eats.
Popcorn naps.
Popcorn eats.

2

Mao eats.
Mao naps.
Popcorn eats.
Popcorn naps.

3

Mao eats.
Popcorn eats.
Mao naps.
Popcorn naps.

Inspiration

Agenda

  • Mental model

  • Draw the model

  • Make it interactive

Mental model

Stream("Mao", "Popcorn")
  .evalMap(eat)
  .evalMap(nap)

The stage model

Stream("Mao", "Popcorn") // catNames stage
  .evalMap(eat)          // eat stage
  .evalMap(nap)          // nap stage
nap pulls on eat
eat pulls on catNames
catNames outputs "Mao" to eat
"Mao eats" is evaluated
eat outputs "Mao" to nap
"Mao naps" is evaluated
nap pulls on eat
eat pulls on catNames
catNames outputs "Popcorn" to eat
"Popcorn eats" is evaluated
eat outputs "Popcorn" to nap
"Popcorn naps" is evaluated
nap pulls on eat
eat pulls on catNames
catNames is done
eat is done
nap is done

Pull model

Pull

Pull.uncons1

Output

Pull.output1

Done

Pull.done
def someOperator(in: Stream): Stream =
  def go(in: Stream): Pull =
    in.pull    // Convert to the Pull datatype
      .uncons1 // Pull an element
      .flatMap {
        case Some((el, rest)) =>
          // ... evaluate ...
          Pull.output1(el) >> // Output the element
            go(rest)          // Recurse
      case None            =>
          Pull.done    // We’re done
      }
  go(in)
    .stream // Convert to the Stream datatype

The stage operator

def stage(stageName: String)(in: Stream): Stream =
  def go(in: Stream): Pull =
    pullPrintln(s"Pulling on $stageName") >>
     in.pull
      .uncons1
      .flatMap {
      case Some((i, rest)) =>
        pullPrintln(s"Outputting $i from $stageName") >>
          Pull.output1(i) >>
          go(rest)
      case None            =>
        pullPrintln(s"$stageName is done") >>
          Pull.pure(None)
      }
  go(in)
    .stream
Stream("Mao", "Popcorn")
  .stage("catNames")
  .evalMap(eat)
  .stage("eat")
  .evalMap(nap)
  .stage("nap")
Pulling on nap
Pulling on eat
Pulling on catNames
Outputting "Mao" from catNames
Outputting "Mao" from eat
Outputting "Mao" from nap
Pulling on nap
Pulling on eat
Pulling on catNames
Outputting "Popcorn" from catNames
Outputting "Popcorn" from eat
Outputting "Popcorn" from nap
Pulling on nap
Pulling on eat
Pulling on catNames
catNames is done
eat is done
nap is done
//> using scala "3.7.0"
//> using dep "com.github.zainab-ali::aquascape::0.4.0"

import fs2.*
import cats.effect.*
import aquascape.*

object CatLifeScape extends AquascapeApp {
  def name: String = "catlife"
  def stream(using Scape[IO]): IO[Unit] = {
    Stream("Mao", "Popcorn")
      .stage("catNames")
      .evalMap(eat)
      .stage("eat")
      .evalMap(nap)
      .compile
      .count
      .compileStage("nap")
      .void
  }
}
> scala CatLifeScape.scala

Interactivity

> scala --power package -o App.js --js -f CatLifeScape.scala
<html>
  <head>
    <script src="App.js" type="text/javascript"></script>
  </head>
  <body>
    <div id="catLifeScape">
  </body>
</html>
def takeN(n: Int)(using Scape[IO]): IO[Unit] =
  Stream('a', 'b').repeat
    .stage("Stream(a, b).repeat")
    .take(n)
    .stage(s"take($n)")
    .compile
    .toList
    .compileStage("compile.toList")

Interactivity

Instant time

val stream: IO[Picture] = ...
import cats.effect.*
import cats.effect.testkit.*

TestControl.executeEmbed(stream)

Thank you!

zainab-ali.github.io/aquascape

Functional Stream Processing in Scala, Gumroad, 50% off with LSUG