Every action has two results (Erlang edition)

Every action has two results: a set of side effects on the world, and the next version of ourselves.

I learned this from Erlang, a purely functional yet stateful programming language. Erlang uses actor-based concurrency. The language is fully immutable, yet the programs are not: every time an actor receives a message, it can send messages to other actors, and then return a different version of itself, instantiated with different state.

Here’s an example from the tutorial in the Erlang docs:

%%% This is the server process for the "messenger"
%%% the user list has the format [{ClientPid1, Name1},{ClientPid22, Name2},...]
server(User_List) ->
    receive
        {From, logon, Name} ->
            New_User_List = server_logon(From, Name, User_List),
            server(New_User_List);
%%% ...
   end.

This defines a server process (that is, an actor) which receives a logon message. When it does that, it builds a new list of users including the one it just received and all the ones it knew about before. Then it constructs a new version of itself with the new list! (That’s implicitly returned from receive.) The next message will be received by the new server.

It’s like that with us, too. Today I made coffee, with a side effect of some dirty cups and fewer coffee beans, and a next version of me that was more alert. Today I checked twitter, with a side effect of nothing observable, and a next version of me ready to write this post. Now I’m writing this post, which will have side effects with unknown consequences, depending on what y’all do with it.

This works in our teams, too. Every task we complete changes the world, and it changes us. Maybe we add tests or implement a feature. In the process, we learn about the software system we participate in. Did we do this as a team, or will we catch each other up later? Is changing the software more safe or harder than before?

When “productivity” measures focus on externally-visible outcomes, sometimes the internal system is left in a terrible state. Burnout in people, “technical debt” in code, and a degeneration of the mental models that connect us with the code we care for.

The consequences of our work matter now. The next version of us matters for the whole future, for everything after now.

Provenance and causality in distributed systems

Can you take a piece of data in your system and say what version of code put it in there, based on what messages from other systems? and what information a human viewed before triggering an action?

Me neither.

Why is this acceptable? (because we’re used to it.)
We could make this possible. We could trace the provenance of data. And at the same time, mostly-solve one of the challenges of distributed systems.

Speaking of distributed systems…

In a distributed system (such as a web app), we can’t say for sure what events happened before others. We get into general relativity complications even at short distances, because information travels through networks at unpredictable speeds. This means there is no one such thing as time, no single sequence of events that says what happened before what. There is time-at-each-point, and inventing a convenient fiction to reconcile them is a pain in the butt.

We usually deal with this by funneling every event through a single point: a transactional database. Transactions prevent simultaneity. Transactions are a crutch.

Some systems choose to apply an ordering after the fact, so that no clients have to wait their turn in order to write events into the system. We can construct a total ordering, like the one that the transactional database is constructing in realtime, as a batch process. Then we have one timeline, and we can use this to think about what events might have caused which others. Still: putting all events in one single ordering is a crutch. Sometimes, simultaneity is legit.

When two different customers purchase two different items from two different warehouses, it does not matter which happened first. When they purchase the same item, it still doesn’t matter – unless we only find one in inventory. And even then: what matters more, that Justyna pushed “Buy” ten seconds before Edith did, or that Edith upgraded to 1-day shipping? Edith is in a bigger hurry. Prioritizing these orders is a business decision. If we raise the time-ordering operation to the business level, we can optimize that decision. At the same time, we stop requiring the underlying system to order every event with respect to every other event.

On the other hand, there are events that we definitely care happened in a specific sequence. If Justyna cancels her purchase, that was predicated on her making it. Don’t mix those up. Each customer saw a specific set of prices, a tax amount, and an estimated ship date. These decisions made by the system caused (in part) the customer’s purchase. They must be recorded either as part of the purchase event, or as events that happened before the purchase.

Traditionally we record prices and estimated ship date as displayed to the customer inside the purchase. What if instead, we thought of the pricing decision and the ship date decision as events that happened before the purchase? and the purchase recorded that those events definitely happened before the purchase event?

We would be working toward establishing a different kind of event ordering. Did Justyna’s purchase happen before Edith’s? We can’t really say; they were at different locations, and neither influenced the other. That pricing decision though, that did influence Justyna’s purchase, so the price decision happened before the purchase.

This allows us to construct a more flexible ordering, something wider than a line.

Causal ordering

Consider a git history. By default, git log prints a line of commits as if they happened in that order — a total ordering.

But that’s not reality. Some commits happen before others: each commit I make is based on its parent, and every parent of that parent commit, transitively. So the parent happened before mine. Meanwhile, you might commit to a different branch. Whether my commit happened before yours is irrelevant. The merge commit brings them together; both my commit and yours happen before the merge commit, and after the parent commit. There’s no need for a total ordering here. The graph expresses that.

This is a causal ordering. It doesn’t care so much about clock time. It cares what commits I worked from when I made mine. I knew about the parent commit, I started from there, so it’s causal. Whatever you were doing on your branch, I didn’t know about it, it wasn’t causal, so there is no “before” or “after” relationship to yours and mine.

We can see the causal ordering clearly, because git tracks it: each commit knows its parents. The cause of each commit is part of the data in the commit.

Back to our retail example. If we record each event along with the events that caused it, then we can make a graph with enough of a causal ordering.

There are two reasons we want an ordering here: external consistency and internal legibility.

External Consistency

External consistency means that Justyna’s experience remains true. Some events are messages from our software system to Justyna (the price is $), and others are messages coming in (Confirm Purchase, Cancel Purchase). The sequence of these external interactions constrains any event ordering we choose. Messages crossing the system boundary must remain valid.

Here’s a more constricting example of external consistency: when someone runs a report and sees a list of transactions for the day, that’s an external message. That message is caused by all the transactions reported in it. If another transaction comes in late, it must be reported later as an amendment to that original report — whereas, if no one had run the report for that day yet, it could be lumped in with the other ones. No one needs to know that it was slow, if no one had looked.

Have you ever run a report, sent the results up the chain, and then had the central office accuse you of fudging the numbers because they run the same report (weeks later) and see different totals? This happens in some organizations, and it’s a violation of external consistency.

Internal Legibility

Other causal events are internal messages: we displayed this price because the pricing system sent us a particular message. The value of retaining causal information here is troubleshooting, and figuring out how our system works.

I’m using the word “legibility”[1] in the sense of “understandability:” as a person we have visibility into the system’s workings, we can follow along with what it’s doing. Distinguish its features, locate problems and change it.

 If Justyna’s purchase event is caused by a ship date decision, and the ship date decision (“today”) tracked its causes (“the inventory system says we have one, with more arriving today”), then we can construct a causal ordering of events. If Edith’s purchase event tracked a ship date decision (“today”) which tracked its causes (“the inventory system says we have zero, with more arriving today”), then we can track a problem to its source. If in reality we only send one today, then it looks like the inventory system’s shipment forecasts were inaccurate.

How would we even track all this?

The global solution to causal ordering is: for every message sent by a component in the system, record every message received before that. Causality at a point-in-time-at-a-point-in-space is limited to information received before that point in time, at that point in space. We can pass this causal chain along with the message.

“Every message received” is a lot of messages. Before Justyna confirmed that purchase, the client component received oodles of messages, from search results, from the catalog, from the ad optimizer, from the review system, from the similar-purchases system, from the categorizer, many more. The client received and displayed information about all kinds of items Justyna did not purchase. Generically saying “this happened before, therefore it can be causal, so we must record it ALL” is prohibitive.

This is where business logic comes in. We know which of these are definitely causal. Let’s pass only those along with the message.

There are others that might be causal. The ad optimizer team probably does want to know which ads Justyna saw before her purchase. We can choose whether to include that with the purchase message, or to reconstruct an approximate timeline afterward based on clocks in the client or in the components that persist these events. For something as aggregated as ad optimization, approximate is probably good enough. This is a business tradeoff between accuracy and decoupling.

Transitive causality

How deep is the causal chain passed along with a message?

We would like to track backward along this chain. When we don’t like the result of Justyna and Edith’s purchase fulfillment, we trace it back. Why did the inventory system said the ship date would be today in both cases. This decision is an event, with causes of “The current inventory is 1” and “Normal turnover for this item is less than 1 per day”; or “The current inventory is 0” and “a shipment is expected today” and “these shipments usually arrive in time to be picked the same day.” From there we can ask whether the decision was valid, and trace further to learn whether each of these inputs was correct.

If every message comes with its causal events, then all of this data is part of the “Estimated ship date today” sent from the inventory system to the client. Then the client packs all of that into its “Justyna confirmed this purchase” event. Even with slimmed-down, business-logic-aware causal listings, messages get big fast.

Alternately, the inventory system could record its decision, and pass a key with the message to the client, and then the client only needs to retain that key. Recording every decision means a bunch of persistent storage, but it doesn’t need to be fast-access. It’d be there for troubleshooting, and for aggregate analysis of system performance. Recording decisions along with the information available at the time lets us evaluate those decisions later, when outcomes are known.

Incrementalness

A system component that chooses to retain causality in its events has two options: repeat causal inputs in the messages it sends outward; or record the causal inputs and pass a key in the messages it sends outward.

Not every system component has to participate. This is an idea that can be rolled out gradually. The client can include in the purchase event as much as its knows: the messages it received, decisions it made, and relevant messages sent outward before this incoming “Confirm Purchase” message was received from Justyna. That’s useful by itself, even when the inventory system isn’t yet retaining its causalities.

Or the inventory system could record its decisions, the code version that made them, and the inputs that contributed to them, even though the client doesn’t retain the key it sends in the message. It isn’t as easy to find the decision of interest without the key, but it could still be possible. And some aggregate decision evaluation can still happen. Then as other system components move toward the same architecture, more benefits are realized.

Conscious Causal Ordering

The benefits of a single, linear ordering of events are consistency, legibility, and visibility into what might be causal. A nonlinear causal ordering gives us more flexibility, consistency, a more accurate but less simplified legibility, and clearer visibility into what might be causal. Constructing causal ordering at the generic level of “all messages received cause all future messages sent” is expensive and also less meaningful than a business-logic-aware, conscious causal ordering. This conscious causal ordering gives us external consistency, accurate legibility, and visibility into what we know to be causal.

At the same time, we can have provenance for data displayed to the users or recorded in our databases. We can know why each piece of information is there, and we can figure out what went wrong, and we can trace all the data impacted by an incorrect past event.

I think this is something we could do, it’s within our ability today. I haven’t seen a system that does it, yet. Is it because we don’t care enough — that we’re willing to say “yeah, I don’t know why it did that, can’t reproduce, won’t fix”? Is it because we’ve never had it before — if we once worked in a system with this kind of traceability, would we refuse to ever go back?


[1] This concept of “legibility” comes from the book Seeing Like a State.

Abstractions over Threads in Java and Scala

TL;DR In Java, get a library that makes Futures work like Scala’s, and then never use ExecutorService directly.

In the beginning, there were Threads. And Java threads were nice and simple. That is, Java threads are simple like some assembly languages are simple: there’s only a few things you can do.

Since then, Java and then Scala have created higher-level abstractions. These are what you want to use. This post explains the differences between them, and what happens when exceptions are thrown.

Java’s Executor, introduced in Java 5, implements thread pooling for you. The only method on an Executor is execute(Runnable). That’s simple too! Give it something to do, and it eventually does it. If an exception trickles up, it goes to the thread’s UncaughtExceptionHandler, which typically prints the stack trace to System.err.

All the implementations provided in Executors also implement ExecutorService, a more extensive interface. Pass the submit() method a Callable or a Runnable, and get back a java.util.concurrent.Future. Please note that Java’s Future is limited. You can’t ask it to do anything on completion or failure. You can pretty much only call get(), which blocks until your task is complete, then returns its result or throws its exception.[1]

If you submitted a task for its side effects, and you never call get() on the Java Future, then no one will ever know about any Exception it throws. It never makes it to the Thread’s UncaughtExceptionHandler, and it never gets output. To get an ExecutorService that never hides exceptions, extend ThreadPoolExecutor, override afterExecute and guarantee that get() is called. What a pain!

Now I’ll switch over to Scala-land, because it has something to tell us about Java Futures.

Scala’s ExecutionContext interface (trait) extends Executor, providing that execute() method. You’ll probably never use this directly, but you’ll need one to work with Scala Futures. There are two good ways to get it. First, use the default ExecutionContext.global; it’s good. Second, if you want your own thread pool, the factory ExecutionContext.fromExecutorService creates a thin wrapper that delegates to your carefully chosen ExecutorService.

To start an asynchronous task in Scala, call

val doStuff = Future { /* do stuff */ } (executionContext)

This will execute the stuff on that executionContext[2], and give you a reference that’s useful.

When you make a Java Future by submitting on an ExecutorService, you have to pass in the whole sequence of code that you want executed. All the error handling has to be there. When you want to do something after that asynchronous code completes, there’s nothing to do but block until it completes.

Scala Futures remove that restriction. You can start something going, then add error handling by calling onFailure.[3] You can extend the asynchronous work with onSuccess. You can even say, “after these three Futures complete, then do this other thing with all three results.” This lets you separate deciding what needs to happen in what order from defining each thing that needs to happen. Yay separation of concerns! I like how this style of programming lets me code the interesting bits first and then fill in the rest.

All these Future-extending and Future-combining services create asynchronous computations of their own, and want an ExecutionContext. This does not have to be the same one the Future is running on. Once a Future is constructed, it does not remember the ExecutionContext.

A task tacked on to another Future will automatically run when it can. Failures will be handled, successes will proceed. This means you aren’t required to ask a Scala Future for its result. It’s possible to do so (and I often do in test code), but discouraged. If you want to do something with the value, use onSuccess. You never have to block a thread!

We can work this way in Java too. In Java 8 there’s native support. Earlier, we can use alternative futures provided in libraries such as Guava. Use this to define asynchronous tasks in smaller, more flexible bits.

This culminates a series of posts on choosing the right ExecutorService. See also Pool-Induced DeadlockForkJoinPool, and Scala’s global ExecutionContext.

For Scala developers:
[3] I said that Scala futures let you handle errors with onFailure. This isn’t true for what Scala considers Fatal errors; these remain uncaught. They propagate to the UncaughtExceptionHandler, which prints to stdout, and that’s it. The thread dies. Your onComplete, onFailure, onSuccess methods, they’re never called. Silent death. If you Await its result, the Await will timeout. Very bad! In the Scala source as of this writing, this happens only for very serious errors: VirtualMachineError, ThreadDeath, InterruptedException, LinkageError, ControlThrowable. However, in Scala 2.10.x, NotImplementedError is “fatal”. When I left a method as ???, the thread disappeared and my program hung. That took forever to debug.

One alternative is to use scalaz.

The scalaz library provides its own Future. The scalaz.concurrent.Future wants an ExecutorService. (This means you can’t use the global ExecutionContext.) Some important differences:
* scalaz defaults the implicit ExecutorService parameter to one with a FixedThreadPool. Because you aren’t required to supply one at all, you don’t always realize you’re using that default.
* Because scalaz calls submit() on the ExecutorService, uncaught exceptions do not hit the UncaughtExceptionHandler and are never printed. Do not use scalaz’s Future directly: use Task instead, which wraps everything in a try {} catch {}.
* In the standard constructor of Task {…} (and Future { … }), the work is not submitted immediately. It is submitted on a call to run or attemptRun.
* Also if you use this standard constructor, then every time you run a Task, the work will be repeated. This is not true of Scala’s future; those will run exactly once.

Hopefully, once you choose a good abstraction, you won’t have to think about this stuff ever again.

—————-
[1] You can also cancel a Java Future, if you care about that.
[2] If it’s the global, it’ll sneakily fork a ForkJoinTask.
[3] is in the “For Scala developers” bit above
[4] The behavior of the UncaughtExceptionHandler can be configured on Threads created in a ThreadFactory that you supply to an ExecutorService of your own construction that you then wrap in an ExecutionContext. And good luck figuring out anything more useful than printing them.

ForkJoinPool: the Other ExecutorService

In Java, an ExecutorService manages a pool of threads that can run tasks. Most ExecutorServices treat all tasks the same. Somebody hands it something to do, the ExecutorService parcels it out to a thread, the thread runs it. Next!

A ForkJoinPool is an ExecutorService that recognizes explicit dependencies between tasks. It is designed for the kind of computation that wants to run in parallel, and then maybe more parallel, and then some of those can run in parallel too. Results of the parallel computations are then combined, so it’s like the threads want to split off and then regroup.

Maybe it’s a computation like, “What’s the shortest path of followers from me to @richhickey?” or “What is the total size of all files in all directories?” or “What’s the most relevant ad to display to this customer?” where we don’t know what all we’re going to have to execute until we’re in the middle of executing it.

On an ordinary ExecutorService, when we split a computation up, each task goes its separate way. Each one is allocated to a thread individually. This becomes a problem when the tasks are small, and the overhead of allocating them to threads takes longer than running them. It becomes a bigger problem when threads split off tasks and wait for all the results to come back to combine them: pretty soon so many threads are waiting that there are no more threads to do the work. This can reach deadlock.

ForkJoinPool embraces many small computations that spawn off and then come back together. It says, “When my thread wants to split its work into many small computations, it shall create them, and then start working on them. If another thread wants to come along and help, great.”

A computation in a ForkJoinPool is like a mother who told all her children to clean the house. While she’s waiting for them to finish their level on the video game she starts picking up. Eventually some kids get up and start helping. When Evelyn starts sweeping and isn’t done by the time Linda has finished the bathroom, then Linda picks up a broom and helps. Eventually the mother takes stock and says, “Hurray! The house is clean.”

That’s a completely unrealistic scenario in my household, but ForkJoinPools are more disciplined than my children. They support unpredictable parallel computation, preventing pool-induced deadlock, and minimize the work of switching back and forth between threads on the CPU.

What’s not to love? Well, a ForkJoinPool is harder to use than a regular old ExecutorService. It’s more complicated than calling “submit.” External threads submit jobs to a ForkJoinPool in an ordinary way, but within the pool, tasks are created differently. ForkJoinTask subclasses get constructed, forked off for execution, and then joined. It’s custom handling, and that requires planning ahead, and that means you have to guess that ForkJoinPool is the solution before you start coding. Or retrofit it later.

Scala does something clever to hide the difference between ForkJoinPools and regular ExecutorServices, so that its Futures work this way by default. Akka uses ForkJoinPools behind the scenes for its actor messaging. Clojure uses ForkJoinPools in its collection processing with Reducers. In Scala and Clojure, you can get these optimizations without extra headache. The abstractions, they keep getting deeper!

————-
Doug Lea wrote ForkJoin for Java and Scala. http://gee.cs.oswego.edu/dl/papers/fj.pdf

Scala: the global ExecutionContext makes your life easier

TL;DR – when in doubt, stick with scala.concurrent.ExecutionContext.global

When you want to run some asynchronous code, choosing a thread pool isn’t any fun. Scala has your back, with its global ExecutionContext.

When I try to put some code in a Future without specifying where to run it, Scala tells me what to do:

scala> Future(println(“Do something slow”))
:14: error: Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global
      
There are some good reasons to use that recommended ExecutionContext. It tries to do things right in several ways. See below for how you can help it along.

The global ExecutionContext has an objective of keeping your CPU busy while limiting time spent context switching between threads. To do this, it starts up a ForkJoinPool[3] whose desired degree of parallelism is the number of CPUs.[1]

ForkJoinPool is particularly smart, able to run small computations with less overhead. It’s more work for its users, who must implement each computation inside a ForkJoinTask. Scala’s global ExecutionContext takes this burden from you: any task submitted to the global context from within the global context is quietly wrapped in a ForkJoinTask.

But wait, there’s more! We also get special handling for blocking tasks. Scala’s Futures resist supplying their values unless you pass them to Await.result(). That’s because Future knows that its result may not be available yet, so this is a blocking call. The Await object wraps the access in scala.concurrent.blocking { … }, which passes the code on to BlockingContext.

The BlockingContext object says, “Hey, current Thread, do you have anything special to do before I start this slow thing?” and the special thread created inside the global ExecutionContext says, “Why yes! I’m going to tell the ForkJoinPool about this.”

The thread’s block context defers to the managedBlock method in ForkJoinPool, which activates the ForkJoinPool’s powers of compensation. ForkJoinPool is trying to keep the CPU busy by keeping degree-of-parallelism threads computing all the time. When informed that one of those threads is about to block, it compensates by starting an additional thread. This way, while your thread is sitting around, a CPU doesn’t have to. As a bonus, this prevents pool-induced deadlock.

In this way, Scala’s Futures and its global ExecutionContext work together to keep your computer humming without going Thread-wild. You can invoke the same magic yourself by wrapping any Thread-hanging code in blocking { … }.[2]

All this makes scala.concurrent.ExecutionContext.global an excellent general-purpose ExecutionContext.

When should you not use it? When you’re writing an asynchronous library, or when you know you’re going to do a lot of blocking, declare your own thread pool. Leave the global one for everyone else.

Also, on mobile: Daniel Solano-Gómez reports: On startup, the global execution context “has to read its configuration.  In many apps, that’s probably fine, but in an Android app it becomes a problem.  I was trying to use Futures to avoid doing I/O on the main thread, so it was a little bit of a surprise that doing that caused I/O on the main thread…. In the end, I just created my own based on an executor service.”

———-
[1] You can alter this: set scala.concurrent.context.numThreads to a hard number or to a multiple, such as “x2” for double your CPUs. The documentation is the source code.
[2] Here’s some code that illustrates using blocking { … } to get the global ExecutionContext to start extra threads.
[3] Scala has its own ForkJoinPool implementation, because Java doesn’t get it until 7, and Scala runs on Java 6 or higher.

Choosing an ExecutorService

TL;DR:

When you need an ExecutorService, the Executors class has several for you. Sometimes it doesn’t matter which you choose, and other times it matters a LOT. The above flow chart is approximate. If it’s simple parallel computation you’re doing, then a fixed pool with as many threads as CPUs works. If those computations start other computations, then you’ll want a ForkJoinPool (that’s another post). If the purpose of threading is to avoid blocking on I/O, that’s different. Maybe you don’t want to limit the number of threads that can wait on I/O, and then Executors.newCachedThreadPool is a good choice.

When you would like to limit the number of threads you start AND your tasks might start other tasks in the same thread pool, then you must worry about Pool Induced Deadlock. Then you need to think about what happens when one task needs to start another. It’s time to get into the nitty-gritty of a ThreadPoolExecutor. This is what the methods on Executors construct for you under the hood, and you can make your own for finer control. To choose them, you need to understand what the ThreadPoolExecutor does when a new task comes in.

Here’s a nice new ThreadPoolExecutor.

Some tasks come in, Runnables passed through execute or submit. At first the ThreadPoolExecutor happily starts a new thread per task. It does this up to the core pool size. Note that even if a thread is idle, it’ll still start a new thread per task until the core number of threads are running. These are up until the pool shuts down, unless you configure it otherwise.[1]

If all the core threads are busy, then the ThreadPoolExecutor will begin storing Runnables in its queue. The BlockingQueue passed in at construction can have capacity 0, MAX_INT, or anywhere in between. Note that Runnables are stored here even if the maximum number of threads are not running. This can cause deadlock if the tasks in the core threads are waiting on tasks they submitted.

Only if the queue is full will the ThreadPoolExecutor start more than the core number of threads. It’ll start them up to the maximum pool size, only as long as the queue is full and more tasks are coming in.

Finally, if there’s no more room in the queue and no more threads allowed, submitting a task to this ThreadPoolExecutor will throw RejectedExecutionException. (You can configure it to drop work, or to tell the calling thread to do its own task instead.[2])

The pictured case is an unusual one, where core size, queue capacity, and max size are all nonzero and finite. A more common case is FixedThreadPool, with a fixed number of threads and effectively infinite queue. Threads will start up and stay up, and tasks will wait their turns. The other common case is CachedThreadPool, with an always-empty queue and effectively infinite threads. Here, the threads will time out when they’re not busy.

If you need something in between, you can construct it yourself. The fixed thread pool is good to put a limit on I/O or CPU context switches. The cached one avoids pool-induced deadlock. If you’re doing interesting recursive calculations, then look into ForkJoinPool.

Aside: All of these ExecutorServices will, by default, start Threads that keep your application alive until they’re done. If you don’t like that, build a ThreadFactory that sets them up as daemon threads, and pass it in when you create the ExecutorService.

Bonus material: Scala REPL code that I used to poke around in the REPL and check these out.
If you’re in Scala, a great way to create a thread pool is Akka’s ThreadPoolBuilder.

Example: The thermometer-looking drawings represent tasks either in process or queued inside the ExecutorService. Red and green represent tasks in process, and amber ones are sitting in the queue. As tasks are completed, the level drops. As tasks are submitted to the ExecutorService, the level goes up.

If I set up an ExecutorService like this:

new ThreadPoolExecutor(
  5, // core pool size
  8, // max pool size, for at most (8 – 5 = 3) red threads
  10, TimeUnit.SECONDS, // idle red threads live this long
  new ArrayBlockingQueue(7)); // queue with finite maximum size

Assuming no tasks complete yet: The first 5 tasks submitted will start up threads; the next 7 tasks will queue; the next 3 tasks coming in will cause more threads to be started. (Tasks will be pulled from the front of the queue and put on the new threads, so the last tasks submitted can fit at the back of the queue.) Any more submission attempts throw RejectedExecution exception.

——
[1] Core threads will be closed when idle only if you set that up: threadPoolExecutor.allowCoreThreadTimeout(true)
[2] Do this by supplying a different RejectedExecutionHandler in the constructor.

Fun with Pool-Induced Deadlock

Did you know that a thread can achieve deadlock with itself? It can happen in any thread pool of constrained size. Watch out for… Pool-Induced Deadlock! [1]

This is easiest in a pool of size 1. Run a task in the pool, and from there, run a task in the same pool. While the outer task waits for the inner to complete, the inner waits for that thread to become available.

This is easy to show in scalaz.

import scalaz.concurrent.Task
import java.util.concurrent.Executors

val es1 = Executors.newFixedThreadPool(1)
def sayHiInTheFuture = Task { println(“Hi!”) }(es1)

val poolInducedDeadlock = Task { sayHiInTheFuture.run } (es1)
poolInducedDeadlock.run

… and your REPL session is dead. You’ll have to Ctrl-C it. It never even says “Hi!” to you.

Why would you run a Task within a Task? We should use flatMap instead, to extend the calculation. That is trivial in this example, but the inner Task-run might be buried inside functions or library calls, and not obvious or extractable.

Why would you ever run those Tasks on the same single-thread pool? Perhaps you don’t pass an ExecutorService into Task at all; it has a default. The default has as many threads as your computer has CPUs. Run on a machine with only one CPU, and herk. If your laptop has eight CPUs and the build server has one, you’ll be surprised when your build hangs.
Or if your build server has four CPUs, then the test that runs four of these Tasks at the same time will freeze up. And it won’t only freeze its own work: anything that wants to run on that default pool hangs forever.
This shows how side effects are dangerous: that call to Runtime.getRuntime.availableProcessors in the default executor service seems reasonable, but it’s reaching into the outside world. Switching machines then changes the behaviour of your code.

Native Scala futures have two ways of preventing Pool-Induced Deadlock: timeouts and a magic execution context. Here is a similar example using Scala futures, this time submitting as many nested futures as we have processors:

import scala.concurrent._
import duration._
import java.util.concurrent.Executors

val n = Runtime.getRuntime.availableProcessors
val ecn = ExecutionContext.fromExecutorService(
             Executors.newFixedThreadPool(n))
// val ecn = ExecutionContext.global
def sayHiInTheFuture = future { println(“Hi!”) }(ecn)

val futures = Range(0,n).map { _ =>

  future { 
    try { 
      Await.ready(sayHiInTheFuture, 5 seconds) 
    } catch {
      case t: TimeoutException => println(“Poo!”)
    }
  } (ecn)
}

You’ll see Poo before it ever says Hi, since  Await.ready times out. It’s submitting the new future to its own thread pool, then blocking until it returns. Meanwhile, sayHiInTheFuture is waiting for its turn on the single thread. Good thing there’s a timeout! The timeout solves the deadlock, and it’s required by the Await call. Scala believes that waiting for futures is evil.

There’s another reason you’re less likely to see pool-induced deadlock in Scala futures: the suggested ExecutionContext is scala.concurrent.ExecutionContext.Implicits.global, which is magic. Try the above example with the global context instead, and it works fine: n Hi‘s and no Poo at all.

I can get Pool-Induced Deadlock on the global context, but only if I don’t use Await.[2] This can be achieved; we hit it at work using scalaz.

How can we avoid this danger? In Scala, we can always use Await and the global context, if we’re not blocking on I/O or anything else slow. In Java or Scala, we can choose cached thread pools without fixed limits. Any library or deep function that blocks on a Task or future should use an ExecutorService of its own. See the next post for suggestions.
Threading is hard, at least on the JVM.

————
[1] described in Programming Concurrency on the JVM, by @venkat_s

[2] Here’s a simple example. scalaz uses a CountDownLatch internally on its Futures.

import scala.concurrent._

import java.util.concurrent.CountDownLatch

val n = Runtime.getRuntime.availableProcessors
val ecng = ExecutionContext.global

val futures = Range(0,n).map { _ =>

  future { 
      val c = new CountDownLatch(1)
      future { c.countDown }(ecng)
      c.await  
      println(“Something!”)
  } (ecng)
}