Wednesday, January 29, 2014

Fun with Pool-Induced Deadlock


Did you know that a thread can achieve deadlock with itself? It can happen in any thread pool of constrained size. Watch out for... Pool-Induced Deadlock! [1]

This is easiest in a pool of size 1. Run a task in the pool, and from there, run a task in the same pool. While the outer task waits for the inner to complete, the inner waits for that thread to become available.

This is easy to show in scalaz.

import scalaz.concurrent.Task
import java.util.concurrent.Executors

val es1 = Executors.newFixedThreadPool(1)
def sayHiInTheFuture = Task { println("Hi!") }(es1)

val poolInducedDeadlock = Task { sayHiInTheFuture.run } (es1)
poolInducedDeadlock.run

... and your REPL session is dead. You'll have to Ctrl-C it. It never even says "Hi!" to you.

Why would you run a Task within a Task? We should use flatMap instead, to extend the calculation. That is trivial in this example, but the inner Task-run might be buried inside functions or library calls, and not obvious or extractable.

Why would you ever run those Tasks on the same single-thread pool? Perhaps you don't pass an ExecutorService into Task at all; it has a default. The default has as many threads as your computer has CPUs. Run on a machine with only one CPU, and herk. If your laptop has eight CPUs and the build server has one, you'll be surprised when your build hangs.
Or if your build server has four CPUs, then the test that runs four of these Tasks at the same time will freeze up. And it won't only freeze its own work: anything that wants to run on that default pool hangs forever.
This shows how side effects are dangerous: that call to Runtime.getRuntime.availableProcessors in the default executor service seems reasonable, but it's reaching into the outside world. Switching machines then changes the behaviour of your code.

Native Scala futures have two ways of preventing Pool-Induced Deadlock: timeouts and a magic execution context. Here is a similar example using Scala futures, this time submitting as many nested futures as we have processors:

import scala.concurrent._
import duration._
import java.util.concurrent.Executors

val n = Runtime.getRuntime.availableProcessors
val ecn = ExecutionContext.fromExecutorService(
             Executors.newFixedThreadPool(n))
// val ecn = ExecutionContext.global
def sayHiInTheFuture = future { println("Hi!") }(ecn)

val futures = Range(0,n).map { _ =>
  future { 
    try { 
      Await.ready(sayHiInTheFuture, 5 seconds) 
    } catch {
      case t: TimeoutException => println("Poo!")
    }
  } (ecn)
}

You'll see Poo before it ever says Hi, since  Await.ready times out. It's submitting the new future to its own thread pool, then blocking until it returns. Meanwhile, sayHiInTheFuture is waiting for its turn on the single thread. Good thing there's a timeout! The timeout solves the deadlock, and it's required by the Await call. Scala believes that waiting for futures is evil.

There's another reason you're less likely to see pool-induced deadlock in Scala futures: the suggested ExecutionContext is scala.concurrent.ExecutionContext.Implicits.global, which is magic. Try the above example with the global context instead, and it works fine: n Hi's and no Poo at all.

I can get Pool-Induced Deadlock on the global context, but only if I don't use Await.[2] This can be achieved; we hit it at work using scalaz.

How can we avoid this danger? In Scala, we can always use Await and the global context, if we're not blocking on I/O or anything else slow. In Java or Scala, we can choose cached thread pools without fixed limits. Any library or deep function that blocks on a Task or future should use an ExecutorService of its own. See the next post for suggestions.
Threading is hard, at least on the JVM.

------------
[1] described in Programming Concurrency on the JVM, by @venkat_s

[2] Here's a simple example. scalaz uses a CountDownLatch internally on its Futures.

import scala.concurrent._

import java.util.concurrent.CountDownLatch

val n = Runtime.getRuntime.availableProcessors
val ecng = ExecutionContext.global

val futures = Range(0,n).map { _ =>
  future { 
      val c = new CountDownLatch(1)
      future { c.countDown }(ecng)
      c.await  
      println("Something!")
  } (ecng)
}

1 comment:

  1. Thanks for the very informative article and I extremely grateful that you perform this piece of writing very simply, I mean to say that it's quite simple to read and understand.

    coral springs pool service

    ReplyDelete