0

This is a follow-up from this post: Why are inner parallel streams faster with new pools than with the commonPool for this scenario?

Apologies in advance for the wall of text. This is for JDK 17.

I am currently testing methods with these scenarios:

  • An outer parallelStream, and an inner nested parallelStream that uses ManagedBlocker to call Thread::sleep
  • An outer parallelStream, and an inner nested parallelStream that uses a different FJPool, but also uses ManagedBlocker to call Thread::sleep

Here's how the code looks:

public class NestedPerf {
  private static final ForkJoinPool sharedInnerPool = new ForkJoinPool();

  public static void main(String[] args){
//    testInnerParallelLoopWithSharedPoolAndManagedBlock(10000);
//    testInnerParallelLoopWithManagedBlock(2);
  }

  public static void testInnerParallelLoopWithSharedPoolAndManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
            try {
              ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          })).join();
          int count = sharedInnerPool.getActiveThreadCount();
          threads.put(count, count);
        });
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }

  public static void testInnerParallelLoopWithManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          innerParallelLoopWithManagedBlock();
          int count = ForkJoinPool.commonPool().getActiveThreadCount();
          threads.put(count, count);
        });
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }
}

Like the title says, the testInnerParallelLoopWithManagedBlock method breaks with the following exception:

java.util.concurrent.RejectedExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker

But the testInnerParallelLoopWithSharedPoolAndManagedBlock method works fine up to a count of 10,000 (and probably more but I haven't tested beyond 10,000).

I initially thought the issue might be due to the interactions between nested parallel streams and ManagedBlocker acting on the same pool. So I created another test method where I use the same shared custom FJPool for both inner and outer loops:

public static void testBothLoopsWithSharedPoolAndManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        sharedInnerPool.submit(()->StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
            try {
              ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          })).join();
          int count = sharedInnerPool.getActiveThreadCount();
          threads.put(count, count);
        })).join();
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }

But this one works just fine. So the issue appears to specifically be with ForkJoinPool.commonPool().

Does anyone have an idea as to what's happening behind the scenes ?

1 Answer 1

1

The common ForkJoinPool1 has a relatively low maximum pool size:

The parameters used to construct the common pool may be controlled by setting the following system properties:

  • java.util.concurrent.ForkJoinPool.common.parallelism - the parallelism level, a non-negative integer
  • java.util.concurrent.ForkJoinPool.common.threadFactory - the class name of a ForkJoinPool.ForkJoinWorkerThreadFactory. The system class loader is used to load this class.
  • java.util.concurrent.ForkJoinPool.common.exceptionHandler - the class name of a Thread.UncaughtExceptionHandler. The system class loader is used to load this class.
  • java.util.concurrent.ForkJoinPool.common.maximumSpares - the maximum number of allowed extra threads to maintain target parallelism (default 256) [emphasis added].

The custom ForkJoinPool you're using is created by calling the no-argument constructor. That constructor:

Creates a ForkJoinPool with parallelism equal to Runtime.availableProcessors(), using defaults for all other parameters (see ForkJoinPool(int, ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, int, int, int, Predicate, long, TimeUnit)).

The constructor linked by that documentation says the following regarding the maximum pool size:

maximumPoolSize - the maximum number of threads allowed. When the maximum is reached, attempts to replace blocked threads fail. (However, because creation and termination of different threads may overlap, and may be managed by the given thread factory, this value may be transiently exceeded.) To arrange the same value as is used by default for the common pool, use 256 plus the parallelism level. (By default, the common pool allows a maximum of 256 spare threads.) Using a value (for example Integer.MAX_VALUE) larger than the implementation's total thread limit has the same effect as using this limit (which is the default) [emphasis added].

And if you look at the implementation note in the class Javadoc, you'll see:

Implementation Note:

This implementation restricts the maximum number of running threads to 32767 [emphasis added]. Attempts to create pools with greater than the maximum number result in IllegalArgumentException. Also, this implementation rejects submitted tasks (that is, by throwing RejectedExecutionException) only when the pool is shut down or internal resources have been exhausted.

In short, your custom ForkJoinPool has a much larger maximum pool size. You can likely prevent the RejectedExecutionException with the common pool if you increase the value of the maximum spares by setting the java.util.concurrent.ForkJoinPool.common.maximumSpares system property.


1. Links are for Java 23, but as far as I can tell the relevant documentation hasn't changed since Java 17.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks @Slaw, that cleared it right up ! Out of curiosity, you wouldn't happen to know (or have insight into) why commonPool was set up this way would you ? Seems a bit odd that the default for commonPool is only 256 while a no-arg constructed FJPool allows ~32000
I can't give a definitive answer (though maybe there's one in some mailing list or JEP somewhere), but my guess is because the common pool is global and they want to mitigate the risk of some arbitrary code overwhelming the computer with threads (although a blocked thread doesn't consume much if any CPU time, it does still consume computer resources, such as memory). Off the top of my head, the Stream API, CompletableFuture, and ForkJoinTask all have methods which implicitly utilize the common fork-join pool, and any other API can do the same.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.