Aquascape

Zainab Ali

https://kebab-ca.se/presentations.html

def eat(cat: String): IO[String] =
  IO.println(s"$cat eats.").as(cat)

def nap(cat: String): IO[Unit] =
  IO.println(s"$cat naps.")
Stream("Mao", "Popcorn")
  .evalMap(eat)
  .evalMap(nap)
  .repeat

fs2

Operators

repeat

infinity

evalMap

side effects

merge

concurrency

Stream("Mao", "Popcorn")
  .evalMap(eat)
  .evalMap(nap)
  .repeat

1

Mao naps.
Mao eats.
Popcorn naps.
Popcorn eats.

2

Mao eats.
Mao naps.
Popcorn eats.
Popcorn naps.

3

Mao eats.
Popcorn eats.
Mao naps.
Popcorn naps.

Inspiration

Problems

Stream.iterate(1)(_ + 1) // 1 2 3 4 ...
  .take(2)               // 1 2
  .compile
  .count                 // 2

The stage operator

Stream.iterate(1)(_ + 1) // A stage
  .take(2)               // Another stage
  .compile.count         // The last stage
compile.count pulls on take(2)
take(2) pulls on Stream.iterate
Stream.iterate outputs 1 to take(2)
take(2) outputs 1 to compile.count
compile.count pulls on take(2)
take(2) pulls on Stream.iterate
Stream.iterate outputs 2 to take(2)
take(2) outputs 2 to compile.count
compile.count pulls on take(2)
take(2) is done
compile.count results in 2

fs2 model

def someOperator(in: Stream): Stream =
  def go(in: Stream): Pull
    in.pull    // Convert to the Pull datatype
      .uncons1 // Pull one element from upstream
      .flatMap {
      case Some((i, rest)) =>
        // ... do stuff ...
        Pull.output1(i) >> // Output the element
          go(rest)         // Recurse
      case None            =>
        Pull.pure(None)    // We’re done
      }
  go(in)
    .stream // Convert to the Stream datatype
def stage(name: String)(in: Stream): Stream =
  def go(in: Stream): Pull
    pullPrintln(s"Pulling on $name") >>
     in.pull    // Convert to the Pull datatype
      .uncons1  // Pull one element from upstream
      .flatMap {
      case Some((i, rest)) =>
        pullPrintln(s"Outputting $i from $name") >>
          Pull.output1(i) >> // Output the element
          go(rest)           // Recurse
      case None            =>
        pullPrintln(s"$name is done") >>
          Pull.pure(None)    // We’re done
      }
  go(in)
    .stream // Convert to the Stream datatype

The stage operator

Stream.iterate(1)(_ + 1)
  .stage("Stream.iterate")
  .take(2)
  .stage("take(2)")
  .compile
  .count
Pulling on take(2)
Pulling on Stream.iterate
Outputting 1 from Stream.iterate
Outputting 1 from take(2)
Pulling on take(2)
Pulling on Stream.iterate
Outputting 2 from Stream.iterate
Outputting 2 from take(2)
Pulling on take(2)
take(2) is done
//> using scala "3.7.0"
//> using dep "com.github.zainab-ali::aquascape::0.4.0"

import fs2.*
import cats.effect.*
import aquascape.*
import aquascape.drawing.*

object TakeScape extends AquascapeApp {
  def name: String = "takeScape"
  def stream(using Scape[IO]): IO[Unit] = {
    Stream.iterate(1)(_ + 1)
      .stage("Stream.iterate(…)")
      .take(2)
      .stage("take(2)")
      .compile
      .count
      .compileStage("compile.count")
      .void
  }
}
> scala TakeScape.scala

In the browser

> scala --power package -o App.js --js -f TakeScape.scala
<html>
  <head>
    <script src="App.js" type="text/javascript"></script>
  </head>
  <body>
    <div id="takeScape">
  </body>
</html>

Interactivity

Instant time

Thank you!

zainab-ali.github.io/aquascape

Functional Stream Processing in Scala, Gumroad, 50% off with KSUG