Things which flow
Examples: events and data
More examples: databases and webservers
Even more examples: heartbeats and tests
Messages in a kafka topic
Key presses and mouse clicks in a game
fs2.io.stdinUtf8(bufferSize) .map(text => s"π $text") .through(fs2.io.stdoutLines()) .compile.drain
Lines βflowβ through the stream
A stream is composed of stages
A stage pulls on its upstream stage
The upstream stage may output an element
The stage may output an element downstream
fs2.io.stdinUtf8(bufferSize) // stdin stage .map(line => s"π $line") // map stage .through(fs2.io.stdoutLines()) // stdout stage .compile.drain
Events
input.txt
// My cats Mao // My friendsβs cats Popcorn
output.txt
π Mao π Popcorn
Files[IO] .readUtf8Lines(Path("input.txt")) .filter(s => !s.startsWith("//")) .map(line => s"π $line") .intersperse("\n") .through(text.utf8.encode) .through(Files[IO].writeAll(Path("output.txt")))
Files[IO] // upstream .readUtf8Lines(Path("input.txt")) .filter(s => !s.startsWith("//")) .map(line => s"π $line") // map .intersperse("\n") // downstream .through(text.utf8.encode) .through(Files[IO].writeAll(Path("output.txt")))
Data
Infinite loops
Incremental pipelines
Iterating over elements
create table catnames (name varchar)
table contents
"Mao" "Popcorn"
console
π Mao π Popcorn
val catNames = sql"select name from catnames" .query[String] .stream .transact(xa)
catNames .map(text => s"π $text\n") .through(fs2.io.stdoutLines()) .compile .drain
console
curl http://localhost:8080/Mao π Mao curl http://localhost:8080/Popcorn π Popcorn
val service = HttpRoutes.of[IO] { case GET -> Root / name => Ok(s"π $name") }.orNotFound
EmberServerBuilder .default[IO] .withHttpApp(service) .build .useForever
Stream.repeatEval(readNextRequest) // read .parEvalMapUnbounded(service.handle) // service .evalMap(writeResponse) // write .compile .drain
Concurrency
Effects
Concurrency
Resource management
console
π // 1 second passes π // 1 second passes π // 1 second passes
Stream.repeatEval(IO.println("π")) .spaced(1.second) .compile .drain
Time
def addCat(line: String): IO[String] = ???
test("Add π to Mao") { for { line <- addCat("Mao") } yield expect.eql("π Mao", line) } test("Ignore empty lines") { for { line <- addCat("") } yield expect.eql("", line) }
console
+ Add π to Mao - Ignore empty lines FAILURES - Ignore empty lines
case class Test(name: String, task: IO[Boolean]) case class TestResult(name: String, succeeded: Boolean)
def runTest(test: Test): IO[TestResult] = { for { succeeded <- test.task prefix = if (succeeded) "+" else "-" _ <- IO.println(s"$prefix ${test.name}") } yield TestResult(test.name, succeeded) }
Stream.emits(tests) .parEvalMapUnbounded(runTest) .compile .drain
val results: List[TestResult] = ... Stream .emits(results) .filterNot(_.succeeded) .evalMap(result => IO.println(s"- ${result.name}")) .compile .drain
Structuring effects
Control flow
50% off with code LSUG