Intro to Cats Effect
Notes on Adam Rosein's presentation to Skills Matter from 04/2021 in partnership with 47D
Who is Adam?
Works for Inner Product. Partner is Noah Welsh.
Wrote Essential Effects.
Outline
- Effects: what and why
- IO: make side-effects safe
- Parallel execution: declarative parallelism
- Shifting contexts: effect isolation
- Managing resources: ensure safe dependency lifecycles
Effects
The substitution model of evalution
val ex = (3 * 4) + (2 * 5)
val = 12 + (2 * 5)
val x = 12 + 10
val x = 22
We understand the top expression by evaluating it down into parts. We use that as a model for what the computer does. The computer may not necessarily do the same evaluations in the same order, but we arrive at the same answer.
Substitution model: We can now replace expressions with their evaluation.
But this doesn't always work for all scenarios.
Breaking substitution
- When printing to the console:
- val x = println("Hello world!")
+ val x = () // this is not the same as `println`. `println` returns Unit, but has a side effect!
- When reading values from the outside world:
- val name = readLine
+ val name = <whatever you tped into the console> // this is not the same as `readLine`. While `readLine` results in the same value _in this case_, it is not idempotent.
- When expressions refer to mutable variables:
var i = 12
- val x = { i += 1; i }
+ val x = 13 // similar to #2
Future
type
val twice =
Future(println("Hello world"))
.flatMap(_ => Future(println("Hello world!")))
# Result
Hello world!
Hello World!
Okay, I see Future(println("Hello world"))
duplicated. Can I dedupe it in a val
?
val print = Future(println("Hello world"))
print.flatMap(print)
# Result
Hello world!
I cannot!
The problem
What happens using substitution != what really happens because side effects happen too soon!
We need to make side effects NOT happen too soon! Things that happen now need to happen later...
The fix
Delay side effects so effects can compose (create a wrapped for the side effects).
case class MyIO[A](unsafeRun: () => A) {
def map[B](f: A => B): MyIO[B] =
MyIO(() => f(unsafeRun()))
def flatMap[B](f: A => MyIO[B]): MyIO[B] =
MyIO(() => f(unsafeRun()).unsafeRun())
}
Is unsafeRun
called during construction? No.
Is unsafeRun
called during composition? No.
Nice!
With the previous example
val print = MyIO(println("Hello world!"))
val twice = print.flatMap(_ => print)
twice.unsafeRun()
Result:
Hello world!
Hello world!
We did it!
Summary
- Substitution: replacing an expression with its evaluation.
- If substitution doesn't change the meaning of the program, we can safely inline definitions, or factor definitions out.
- If substitution does change the meaning of the program, delay it.
- Effects separate describing what we want to happe nfrom actually executing them. We can freely substitution the description of effects up until the point we run them.
- We created the
MyIO[A]
effect, which delayed the side effect until theunsafeRun
method is called. We produced newMyIO
values with themap
andflatMap
combinators.
From example MyIO
to cats.effect.IO
Constructors
create effects
IO.delay[A](a: => A): IO[A]
akaIO.apply
IO,pure[A](a: A): IO: IO[A]
- ...
Constructors
Once we have an IO
with the constructor, we can transform them:
map
- convertmapN
- combineflatMap
- perform 1IO
and then perform anotherIO
handleErrorWith
- deal with exceptions
Eliminators
Executing effects:
unsafeRunSync: A
unsafeToFuture: Future[A]
- ...
These are dangerous! We can no longer rely on substitution if we do this.
From generic App
to cats.effect.IOApp
Instead of writing a def main
, you write a def run
:
object HelloWorld extends IOApp {
// no longer need to run `unsafeRun` as the app handles it
def run(args: List[String]): IO[ExitCode] =
helloWorld.as(ExistCode.Success)
val helloWorld: IO[Unit] =
for {
_ <- putStr("hello")
_ <- putStr("world)
} yield ()
def putStr(s: => String): IO[String] = IO(s)
}
Your application as one effect.
Summary
cats.effect.IO
is an effect that can encapsulate any side effect.
a. Constructors produce anIO
from pure values, delayed side effects, errors, and other types likeFuture
.
b. Combinators let you build new effects, transform their outputs, and handle errors.
c. Eliminators executeIO
values. You should only run them at the very "edges" of your programs.cats.effect.IOApp
lets you describe your programs as a singleIO
effect that it executes.
Parallel Execution
Help for Seeing Parallelism
What is parallelism? For the JVM, it means running multiple threads at the same time.
object debug {
implicit class DebugHelper[A](ioa: IO[A]) {
def debug: IO[A] =
for {
a <- ioa
tn = Thread.currentThread.getName
_ = println(s"[$tn] $a")
} yield a
}
}
You can print the computed value with the current thread using a Scala extension method via implicit class
es.
Composing 2 effects via mapN
(IO[A], IO[B]) => ((A, B) => C) => IO[C]
object MapNExample extends IOApp {
def run(args: List[String]): IO[ExitCode] =
seq.as(ExitCode.Success)
val hello = IO("hello").debug
val world = IO("world").debug
val seq = (hello, world)
.mapN((h, w) => s"$h $w)
debug
}
Result:
[ioapp-compute-0] hello
[ioapp-compute-0] world
[ioapp-compute-0] hello world
By default, IO
has no concept of concurrency. It runs the code on the same thread, ioapp-compute-0
.
Using parMapN
for concurrency
(IO[A], IO[B]) => ((A, B) => C) => IO[C]
object ParMapNExample extends IOApp {
def run(args: List[String]): IO[ExitCode] =
par.as(ExitCode.Success)
val hello = IO("hello").debug
val world = IO("world").debug
val par = (hello, world)
.parMapN((h, w) => s"$h $w)
debug
}
Result:
[ioapp-compute-1] world
[ioapp-compute-0] hello
[ioapp-compute-0] hello world
Two different threads! Yay!
parTraverse
List[A] => (A => IO[B]) => IO[List[B]]
object ParTraverseExample extends IOApp {
def run(args: List[String]): IO[ExitCode] =
tasks
.parTraverse(task)
.debug
.as(ExitCode.Success)
val numTasks = 100
val tasks: List[Int] = List.range(0, numTasks)
def task(id: Int): IO[Int] = IO(id).debug
}
Result:
[ioapp-compute-7] 7
[ioapp-compute-0] 0
[ioapp-compute-6] 6
[ioapp-compute-1] 1
...
parSequence
List[IO[A]] => IO[List[A]]
object ParSequenceExample extends IOApp {
def run(args: List[String]): IO[ExitCode] =
tasks
.parSequence(task)
.debug
.as(ExitCode.Success)
val numTasks = 100
val tasks: List[IO[Int]] = List.tabulate(numTasks)(task)
def task(id: Int): IO[Int] = IO(id).debug
}
Summary
IO
does not model parallel execution itself, with result to its monadic or applicative behavior.IO
supports explicit parallelism:parMapN
,parTraverse
,parSequence
are the parallel versions of (the sequential)mapN
,traverse
, andsequence
. Errors are managed in a fail-fast manner.
How much parallelism can we get?
- We can schedule as many effects as we like via
parMapN
, etc. - But we only have
n
CPUs. - Let's say I have 8 CPUs, 8 threads, and 10000 effects. These effects will be fighting for resources.
- How do they "share"? What happens if they don't?
We can set asynchronous boundaries by limiting work on individual threads using IO.shift
. We can shift work from one thread to another to share the work across threads. We can declare where it's ok to be rescheduled onto another thread.
Why have multiple contexts?
We may want to have separate thread pools and isolate effects from others.
Blocking (Cats Effects 2)
What happens when something blocks? We may want to have separate threads for blocking so that we are not affecting other threads for other effects. For example, some threads for just computations and other threads for sequential operations.
def run(args: List[String]): IO[ExitCode] =
Blocker[IO].use { blocker =>
withBlocker(blocker).as(ExitCode.Success)
}
def withBlocker(blocker: Blocker): IO[Unit]=
for {
_ <- IO("on default").debug
_ <- blocker.blockOn(IO("on blocker").debug)
_ <- IO("where am I?").debug
} yield ()
Result:
[ioapp-compute-0] on default
[cats-effect-blocker-0] on blocker
[ioapp-compute-1] where am I?
Summary
- Threads abstract over what is concurrently executing atop the available set of processors, so we can have many more threads than CPUs.
- Asynchronrous boundaries help to ensure applications make progress, and so they can resume on a different context in order to isolate various workloads from one another.
- IOApp provides a default
ExecutionContext
with a fixed number--the number of CPUs on themachine--of threads. This is meant for CPU-bound (non-blocking) work. - Blocking I/O bound work should be run in an unbounded thread pool. Cats Effect 2 provides the
Blocker
interface to declare effects that block; in Cats Effect 3 you useIO.blocking
Resources
What are they?
They are pieces of state. There is a cost to creating them. You need to clean them up when you are done using them.
- examples: file handles, database connections
- state with defined acquire and release lifecycles
Creating a Resource
We capture that above in a data structure to more easily handle.
/**
* @acquire a resource
* @release clean up the resource, always run it in the end assuming the acquisition happens
**/
def make[A](
acquire: IO[A])(
release: A => IO[Unit]
): Resource[IO, A]
Once you have a resource, you can use it:
object BasicResource extends IOApp {
def run(args: List[String]): IO[ExitCode]
stringResource.use { s =>
IO(s"$s is so cool!").debug
}
.as(ExitCode.Success)
val stringResource: Resource[IO, String] =
Resource.make(
IO("> acquiring stringResource").debug *> IO("String")(
_ => IO("< releasing StringResource").debug.void
)
)
}
Result:
[ioapp-compute-0] > acquiring stringResource
[ioapp-compute-0] String is so cool!
[ioapp-compute-0] > releasing stringResource
Resources may be used for dependency management in applications, too.
You can also compose Resources serially and in parallel!
mapN
, parMapN
:
(Resource[IO, A], Resource[IO, B]) => ((A, B) => C) => Resource[IO, C]
Then, you can compose all your app's resources into one!
Summary
- Effects: maintain substitution by delaying execution
- Side effects are bugs. We can't follow them easily...
- IO: an effect that can safely capture any side effect
- Parallel execution:
par-
yay - Shifting contexts: distinguish blockign vs. non-blocking effects to improve throughput and make progress
- Managing resources: separate acquire-release from use; dependency lifecycle management
And more: Fiber
, IO.async
, Ref
, Deferred
...