Intro to Cats Effect

Notes on Adam Rosein's presentation to Skills Matter from 04/2021 in partnership with 47D

Who is Adam?

Works for Inner Product. Partner is Noah Welsh.

Wrote Essential Effects.

Outline

  • Effects: what and why
  • IO: make side-effects safe
  • Parallel execution: declarative parallelism
  • Shifting contexts: effect isolation
  • Managing resources: ensure safe dependency lifecycles

Effects

The substitution model of evalution

  1. val ex = (3 * 4) + (2 * 5)
  2. val = 12 + (2 * 5)
  3. val x = 12 + 10
  4. val x = 22

We understand the top expression by evaluating it down into parts. We use that as a model for what the computer does. The computer may not necessarily do the same evaluations in the same order, but we arrive at the same answer.

Substitution model: We can now replace expressions with their evaluation.

But this doesn't always work for all scenarios.

Breaking substitution

  1. When printing to the console:
- val x = println("Hello world!")
+ val x = () // this is not the same as `println`. `println` returns Unit, but has a side effect!
  1. When reading values from the outside world:
- val name = readLine
+ val name = <whatever you tped into the console> // this is not the same as `readLine`. While `readLine` results in the same value _in this case_, it is not idempotent.
  1. When expressions refer to mutable variables:
var i = 12

- val x = { i += 1; i }
+ val x = 13 // similar to #2

Future type

val twice = 
    Future(println("Hello world"))
        .flatMap(_ => Future(println("Hello world!")))
# Result
Hello world!
Hello World!

Okay, I see Future(println("Hello world")) duplicated. Can I dedupe it in a val?

val print = Future(println("Hello world"))
print.flatMap(print)
# Result
Hello world!

I cannot!

The problem

What happens using substitution != what really happens because side effects happen too soon!

We need to make side effects NOT happen too soon! Things that happen now need to happen later...

The fix

Delay side effects so effects can compose (create a wrapped for the side effects).

case class MyIO[A](unsafeRun: () => A) {
    def map[B](f: A => B): MyIO[B] = 
        MyIO(() => f(unsafeRun()))
    def flatMap[B](f: A => MyIO[B]): MyIO[B] =
        MyIO(() => f(unsafeRun()).unsafeRun())
}

Is unsafeRun called during construction? No.

Is unsafeRun called during composition? No.

Nice!

With the previous example

val print = MyIO(println("Hello world!"))
val twice = print.flatMap(_ => print)

twice.unsafeRun()

Result:

Hello world!
Hello world!

We did it!

Summary

  1. Substitution: replacing an expression with its evaluation.
  2. If substitution doesn't change the meaning of the program, we can safely inline definitions, or factor definitions out.
  3. If substitution does change the meaning of the program, delay it.
  4. Effects separate describing what we want to happe nfrom actually executing them. We can freely substitution the description of effects up until the point we run them.
  5. We created the MyIO[A] effect, which delayed the side effect until the unsafeRun method is called. We produced new MyIO values with the map and flatMap combinators.

From example MyIO to cats.effect.IO

Constructors

create effects

  • IO.delay[A](a: => A): IO[A] aka IO.apply
  • IO,pure[A](a: A): IO: IO[A]
  • ...

Constructors

Once we have an IO with the constructor, we can transform them:

  • map - convert
  • mapN - combine
  • flatMap - perform 1 IO and then perform another IO
  • handleErrorWith - deal with exceptions

Eliminators

Executing effects:

  • unsafeRunSync: A
  • unsafeToFuture: Future[A]
  • ...

These are dangerous! We can no longer rely on substitution if we do this.

From generic App to cats.effect.IOApp

Instead of writing a def main, you write a def run:

object HelloWorld extends IOApp {
    // no longer need to run `unsafeRun` as the app handles it
    def run(args: List[String]): IO[ExitCode] =
        helloWorld.as(ExistCode.Success)
    val helloWorld: IO[Unit] = 
        for {
            _ <- putStr("hello")
            _ <- putStr("world)
        } yield ()
    def putStr(s: => String): IO[String] = IO(s)
}

Your application as one effect.

Summary

  1. cats.effect.IO is an effect that can encapsulate any side effect.
    a. Constructors produce an IO from pure values, delayed side effects, errors, and other types like Future.
    b. Combinators let you build new effects, transform their outputs, and handle errors.
    c. Eliminators execute IO values. You should only run them at the very "edges" of your programs.
  2. cats.effect.IOApp lets you describe your programs as a single IO effect that it executes.

Parallel Execution

Help for Seeing Parallelism

What is parallelism? For the JVM, it means running multiple threads at the same time.

object debug {
    implicit class DebugHelper[A](ioa: IO[A]) {
        def debug: IO[A] =
            for {
                a <- ioa
                tn = Thread.currentThread.getName
                _ = println(s"[$tn] $a")
            } yield a
    }
}

You can print the computed value with the current thread using a Scala extension method via implicit classes.

Composing 2 effects via mapN

(IO[A], IO[B]) => ((A, B) => C) => IO[C]

object MapNExample extends IOApp {
    def run(args: List[String]): IO[ExitCode] =
        seq.as(ExitCode.Success)
    val hello = IO("hello").debug
    val world = IO("world").debug
    val seq = (hello, world)
        .mapN((h, w) => s"$h $w)
        debug
}

Result:

[ioapp-compute-0] hello
[ioapp-compute-0] world
[ioapp-compute-0] hello world

By default, IO has no concept of concurrency. It runs the code on the same thread, ioapp-compute-0.

Using parMapN for concurrency

(IO[A], IO[B]) => ((A, B) => C) => IO[C]

object ParMapNExample extends IOApp {
    def run(args: List[String]): IO[ExitCode] =
        par.as(ExitCode.Success)
    val hello = IO("hello").debug
    val world = IO("world").debug
    val par = (hello, world)
        .parMapN((h, w) => s"$h $w)
        debug
}

Result:

[ioapp-compute-1] world
[ioapp-compute-0] hello
[ioapp-compute-0] hello world

Two different threads! Yay!

parTraverse

List[A] => (A => IO[B]) => IO[List[B]]

object ParTraverseExample extends IOApp {
    def run(args: List[String]): IO[ExitCode] =
        tasks
            .parTraverse(task)
            .debug
            .as(ExitCode.Success)
    val numTasks = 100
    val tasks: List[Int] = List.range(0, numTasks)
    
    def task(id: Int): IO[Int] = IO(id).debug
}

Result:

[ioapp-compute-7] 7
[ioapp-compute-0] 0
[ioapp-compute-6] 6
[ioapp-compute-1] 1
...

parSequence

List[IO[A]] => IO[List[A]]

object ParSequenceExample extends IOApp {
    def run(args: List[String]): IO[ExitCode] =
        tasks
            .parSequence(task)
            .debug
            .as(ExitCode.Success)
    val numTasks = 100
    val tasks: List[IO[Int]] = List.tabulate(numTasks)(task)
    
    def task(id: Int): IO[Int] = IO(id).debug
}

Summary

  1. IO does not model parallel execution itself, with result to its monadic or applicative behavior.
  2. IO supports explicit parallelism: parMapN, parTraverse, parSequence are the parallel versions of (the sequential) mapN, traverse, and sequence. Errors are managed in a fail-fast manner.

How much parallelism can we get?

  • We can schedule as many effects as we like via parMapN, etc.
  • But we only have n CPUs.
  • Let's say I have 8 CPUs, 8 threads, and 10000 effects. These effects will be fighting for resources.
  • How do they "share"? What happens if they don't?
asynchronous_boundaries

We can set asynchronous boundaries by limiting work on individual threads using IO.shift. We can shift work from one thread to another to share the work across threads. We can declare where it's ok to be rescheduled onto another thread.

Why have multiple contexts?

We may want to have separate thread pools and isolate effects from others.

Blocking (Cats Effects 2)

What happens when something blocks? We may want to have separate threads for blocking so that we are not affecting other threads for other effects. For example, some threads for just computations and other threads for sequential operations.

def run(args: List[String]): IO[ExitCode] =
    Blocker[IO].use { blocker =>
        withBlocker(blocker).as(ExitCode.Success)
}

def withBlocker(blocker: Blocker): IO[Unit]=
    for {
        _ <- IO("on default").debug
        _ <- blocker.blockOn(IO("on blocker").debug)
        _ <- IO("where am I?").debug
    } yield ()

Result:

[ioapp-compute-0] on default
[cats-effect-blocker-0] on blocker
[ioapp-compute-1] where am I?

Summary

  1. Threads abstract over what is concurrently executing atop the available set of processors, so we can have many more threads than CPUs.
  2. Asynchronrous boundaries help to ensure applications make progress, and so they can resume on a different context in order to isolate various workloads from one another.
  3. IOApp provides a default ExecutionContext with a fixed number--the number of CPUs on themachine--of threads. This is meant for CPU-bound (non-blocking) work.
  4. Blocking I/O bound work should be run in an unbounded thread pool. Cats Effect 2 provides the Blocker interface to declare effects that block; in Cats Effect 3 you use IO.blocking

Resources

What are they?

They are pieces of state. There is a cost to creating them. You need to clean them up when you are done using them.

  • examples: file handles, database connections
  • state with defined acquire and release lifecycles

Creating a Resource

We capture that above in a data structure to more easily handle.

/**
* @acquire a resource
* @release clean up the resource, always run it in the end assuming the acquisition happens
**/
def make[A](
    acquire: IO[A])(
    release: A => IO[Unit]
): Resource[IO, A]

Once you have a resource, you can use it:

object BasicResource extends IOApp {
    def run(args: List[String]): IO[ExitCode]
        stringResource.use { s =>
            IO(s"$s is so cool!").debug
        }
        .as(ExitCode.Success)
    val stringResource: Resource[IO, String] =
        Resource.make(
            IO("> acquiring stringResource").debug *> IO("String")(
                _ => IO("< releasing StringResource").debug.void
            )
        )
}

Result:

[ioapp-compute-0] > acquiring stringResource
[ioapp-compute-0] String is so cool!
[ioapp-compute-0] > releasing stringResource

Resources may be used for dependency management in applications, too.

You can also compose Resources serially and in parallel!

mapN, parMapN:

(Resource[IO, A], Resource[IO, B]) => ((A, B) => C) => Resource[IO, C]

Then, you can compose all your app's resources into one!

Summary

  • Effects: maintain substitution by delaying execution
  • Side effects are bugs. We can't follow them easily...
  • IO: an effect that can safely capture any side effect
  • Parallel execution: par- yay
  • Shifting contexts: distinguish blockign vs. non-blocking effects to improve throughput and make progress
  • Managing resources: separate acquire-release from use; dependency lifecycle management

And more: Fiber, IO.async, Ref, Deferred...

Next Steps