Why have thread pools?
A thread pool, also known as an execution context is a way of managing parallelism.
To demonstrate, let’s have a look at a simple task: snooze
.
val snooze: IO[Unit] = IO(Thread.sleep(2000L))
snooze
does absolutely nothing. More precisely, it
does absolutely nothing for two seconds. We can double check this by
running it using our handy time
function:
time(snooze).unsafeRunSync() // error: // Could not find an implicit IORuntime. // // Instead of calling unsafe methods directly, consider using cats.effect.IOApp, which // runs your IO. If integrating with non-functional code or experimenting in a REPL / Worksheet, // add the following import: // // import cats.effect.unsafe.implicits.global // // Alternatively, you can create an explicit IORuntime value and put it in implicit scope. // This may be useful if you have a pre-existing fixed thread pool and/or scheduler which you // wish to use to execute IO programs. Please be sure to review thread pool best practices to // avoid unintentionally degrading your application performance.
Whoops! We need an IORuntime
. Let’s use our own
basicRuntime
explicitly:
time(snooze).unsafeRunSync()(basicRuntime) // res1: String = "The task took 2 seconds."
As expected, it took two seconds to run.
What if we have multiple snooze tasks?
val snoozes: List[IO[Unit]] = List(snooze, snooze)
We can combine a list of tasks using parSequence
:
val parallelSnoozes: IO[Unit] = snoozes.parSequence.void
The parSequence
function produces an IO
that runs multiple tasks in
parallel.
time(parallelSnoozes).unsafeRunSync()(basicRuntime) // res2: String = "The task took 2 seconds."
Both tasks were run at the same time, so the total elapsed time was still only two seconds.
If you’re used to parallel computations, you may look at
parSequence
with a degree of suspicion. It lets us run
many tasks in parallel, but how many?
For instance, we can declare a thousand snooze
tasks:
val lotsOfSnoozes = List.fill(1000)(snooze).parSequence.void
Will they really only take two seconds?
time(lotsOfSnoozes).unsafeRunSync()(basicRuntime) // res3: String = "The task took 2 seconds."
Nice! There seems to be no upper limit on the tasks we can run in parallel.
Knowing our limits
Unlimited parallelism seems like a great idea, but it has significant downsides. Let’s have a look at a different task to demonstrate.
val factorial: IO[Unit] = { @scala.annotation.tailrec def go(n: Long, total: Long): Long = if (n > 1) go(n - 1, total * n) else total IO(go(2000000000L, 1)).void }
Woah! That’s an odd bit of code.
If you’re a functional programming enthusiast, you’re probably so fond of factorials that you compute them in your sleep.
Those skills, elegant as they are, aren’t too important here. Don’t
worry if you’ve never heard of a factorial, haven’t seen @scala.annotation.tailrec
before, or get a headache
reading this Escher-like code.
The key part of factorial
is that, unlike snooze
, it does a lot of multiplication.
Running this on my rusty old laptop takes approximately two seconds.
time(factorial).unsafeRunSync()(basicRuntime) // res4: String = "The task took 2 seconds."
The functional programmer within you might point out that this
code is pure: there’s no reason to wrap it in an
IO
. While that’s true, doing so lets us parallelize it
with parSequence
:
val tenFactorials: IO[Unit] = List.fill(10)(factorial).parSequence.void
It took two seconds to run factorial
once, so it
should also take two seconds to run in parallel, shouldn’t it?
time(tenFactorials).unsafeRunSync()(basicRuntime) // res5: String = "The task took 3 seconds."
If you ran the code above, you probably felt your laptop heat up a bit. You might have also found that the code didn’t take two seconds — it took longer.
This is different to our snooze
task, which always took
two seconds regardless of whether we ran one, two or a
thousand in parallel.
Why would that be?
To answer that question, we need to take a closer look at our computers.
The processor beneath
Despite what we might wish, our laptops are not magical boxes with unlimited compute power: they’re made of physical devices, and those devices have limits. A computer has a limited number of processors, each of which can compute one thing at once.
We can check that number in Scala by taking a look at the
Runtime
object:
val numProcessors = Runtime.getRuntime().availableProcessors() // numProcessors: Int = 8
My humble laptop has eight processors: it can execute a maximum of eight computations at once. Even if I ask it to calculate ten factorials in parallel, it won’t actually do so.
You might rightly wonder: why didn’t we hit this limit for the
snooze
task? This is because the Thread.sleep
operation in snooze
didn’t occupy a processor as it
ran.
Setting our limits
We can take a closer look at how our factorial task is getting run by timing each task:
val timedFactorial: IO[String] = time(factorial) val timedFactorials: IO[List[String]] = List.fill(20)(timedFactorial).parSequence
This gives a string description for each of the twenty tasks corresponding to how long the task took to run.
timedFactorials.unsafeRunSync()(basicRuntime) // res6: List[String] = List( // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 5 seconds.", // "The task took 5 seconds.", // "The task took 5 seconds.", // "The task took 5 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 5 seconds.", // "The task took 5 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 4 seconds.", // "The task took 6 seconds.", // "The task took 5 seconds.", // "The task took 6 seconds." // )
Strange! We see times of anywhere between two and six.
All tasks are fired off at the same time, but our processors switch between them as they run. A processor might start computing a task, but put it on hold in order to compute a different one, switching back to it at a later time. Tasks are started, halted and restarted as they all compete for processors.
The more tasks we parallelize, the more switching each processor has to do. This is problematic for a few reasons:
Switching between tasks is expensive: a processor has to unload all the information associated with the computation its about to pause, and reload the information for the next.
A paused computation still has resources hanging around. Our
factorial
task doesn’t need too much memory, but we could easily write a task that used a lot of heap space. Running too many memory-intensive computations would give usOutOfMemoryError
exceptions.
It’s generally much more useful to limit the number of tasks that can be run in parallel. We can do this using thread pools.
Bounded and unbounded thread pools
Our current limit, or lack thereof, is specified by our
thread pool. The cats-effect IORuntime
has a
thread pool under the hood. The basicRuntime
we’ve been
using has an unbounded thread pool: it can execute an unlimited
number of tasks in parallel.
In our Threading
setup code, we declared another
boundedRuntime
function. Let’s give it a spin.
We can pick a bound of two for ten factorial
tasks:
time(tenFactorials).unsafeRunSync()(boundedRuntime(2)) // res7: String = "The task took 12 seconds."
It’s much slower than before — only two tasks are run at once.
timedFactorials.unsafeRunSync()(boundedRuntime(2)) // res8: List[String] = List( // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds." // )
Unlike the previous unbounded thread pool, each task takes two seconds ⸺ the tasks might be scheduled at once, but they’re fired off over time, once a thread is free to compute them.
What if we set the bound higher?
timedFactorials.unsafeRunSync()(boundedRuntime(20)) // res9: List[String] = List( // "The task took 6 seconds.", // "The task took 5 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 5 seconds.", // "The task took 6 seconds.", // "The task took 5 seconds.", // "The task took 6 seconds.", // "The task took 5 seconds.", // "The task took 5 seconds.", // "The task took 4 seconds.", // "The task took 5 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 5 seconds.", // "The task took 5 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds.", // "The task took 6 seconds." // )
The timedFactorials
task behaves as if were running
on the basicRuntime
: it’s as if we didn’t have
a bound at all.
If you think about it, this makes sense: if we have more
computations running than the number of processors, each processor
will still need to switch between them. Our factorial
tasks will end up being paused by the processor and taking longer.
So far, we’ve experimented with bounds of two and twenty. Having two tasks run at once gets around our thread-switching problem: each processor can focus on a single task. But having only two isn’t too useful: most of our processors aren’t doing anything Scala-related.
The best limit probably corresponds to the number of processors. Let’s check:
timedFactorials.unsafeRunSync()(boundedRuntime(numProcessors)) // res10: List[String] = List( // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds.", // "The task took 2 seconds." // )
Sure enough, each task takes two seconds.
Snoozing
A thread pool bounded at numProcessors
is the
best option for the factorial
task. But what about
snooze
?
We know that we can run an unlimited number of parallel
snooze
tasks using the basicRuntime
— this
had an unbounded thread pool. What about our boundedRuntime
?
Let’s test it by running more tasks than processors. We can
construct an IO
that runs ten tasks in parallel:
val tenSnoozes: IO[Unit] = List.fill(10)(snooze).parSequence.void
Let’s try running this our bounded thread pool.
time(tenSnoozes).unsafeRunSync()(boundedRuntime(numProcessors)) // res11: String = "The task took 4 seconds."
Not two, but four seconds.
The Thread.sleep
call might not hog a processor, but
it does hog a thread in our pool.
By choosing a bounded thread pool for our tenSnoozes
task,
we cause it to take longer. If we want to get our task to complete as
fast as possible, it seems better to have an unbounded pool.