5

So I recently ran a benchmark where I compared the performance of nested streams in 3 cases:

  • Parallel outer stream and sequential inner stream
  • Parallel outer and inner streams (using parallelStream) - this effectively tests `ForkJoinPool.commonPool()
  • Parallel outer and inner streams but inner streams create new ForkJoinPool for each task

Here's the benchmark code (I've used JMH):

public class NestedPerf {
  @State(Scope.Benchmark)
  public static class StateData{
    public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
    public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
  }
  private static void runInNewPool(Runnable task) {
    ForkJoinPool pool = new ForkJoinPool();
    try {
      pool.submit(task).join();
    } finally {
      pool.shutdown();
    }
  }
  private static void innerParallelLoop() {
    StateData.innerLoop.parallelStream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  private static void innerSequentialLoop() {
    StateData.innerLoop.stream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  @Benchmark
  public void testingNewPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      runInNewPool(ParallelPerf::innerParallelLoop);
      bh.consume(i);
    });
  }

  @Benchmark
  public void testingCommonPoolWithSequentialInner(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerSequentialLoop();
      bh.consume(i);
    });
  }
  @Benchmark
  public void testingCommonPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerParallelLoop();
      bh.consume(i);
    });
  }
}

And here is the output:

Benchmark                                         Mode  Cnt   Score   Error  Units
NestedPerf.testingCommonPool                     thrpt   25   1.935 ± 0.005  ops/s
NestedPerf.testingCommonPoolWithSequentialInner  thrpt   25   1.744 ± 0.007  ops/s
NestedPerf.testingNewPool                        thrpt   25  22.648 ± 0.559  ops/s

The difference between the method with new Pools vs the method with commonPool is surprising. Does anyone have an idea as to why creating new pools makes things around 20x faster for this benchmark ?

If it helps, I'm running this on a Core i7 10850H system with 12 available CPUs (hexcore + hyperthreading).

2 Answers 2

5

The performance difference you're observing comes from how the ForkJoinPool.commonPool() handles nested parallel streams. When both the outer and inner streams use parallelStream() without a custom thread pool, they compete for the same limited set of threads in the common pool. This leads to thread contention and under-utilization of CPU resources because the pool cannot effectively manage the nested parallelism.

By creating a new ForkJoinPool for each inner stream, you provide dedicated threads for inner tasks, avoiding contention with the outer stream's threads. This allows both levels of parallelism to utilize the CPU cores fully. Obviously as you've noticed it results in a substantial performance boost despite the overhead of creating new pools.

Possible Improvements:

You can avoid the overhead of creating multiple pools by using a shared custom ForkJoinPool for all inner streams. This approach eliminates the pool creation overhead while still providing separate threads for inner parallelism, leading to even better performance.

public class NestedPerf {
  private static final ForkJoinPool innerPool = new ForkJoinPool();

  @Benchmark
  public void testingSharedInnerPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
        try {
          Thread.sleep(5);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
      })).join();
      bh.consume(i);
    });
  }
}

Alternatively, you could flatten the parallelism to a single level by combining the outer and inner loops into one parallel stream. This method can effectively utilize the parallel stream without nested parallelism, often resulting in the fastest execution time because it maximizes CPU utilization and minimizes overhead.

@Benchmark
public void testingFlattenedStream(Blackhole bh){
  StateData.outerLoop.parallelStream()
    .flatMap(i -> StateData.innerLoop.stream())
    .unordered()
    .forEach(j -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
      bh.consume(j);
    });
}

If you're going for simplicity and speed then flattening is likely going to win out.

Sign up to request clarification or add additional context in comments.

7 Comments

@Andorrax See this excerpt from the Javadoc of ForkJoinPool: "The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization." Your use of Thread::sleep is preventing the pool from maintaining its parallelism. Try using a ManagedBlocker.
@Andorrax, ForkJoinPool.commonPool() uses a fixed number of threads (e.g., availableProcessors - 1), to prevent overloading the system. When nested parallelism occurs within the same pool they can end up waiting on tasks from their own pool, creating a bottleneck (essentially, a deadlock-like situation without actually being one). A new ForkJoinPool creates an independent thread pool that doesn't share threads with the commonPool. The tasks in this pool don’t get blocked by tasks in commonPool. It avoids thread starvation caused by inner task delays waiting for outer tasks to complete.
Re: "Does it being faster mean it's actually stealing threads from the commonPool?". Not really. When you create a new pool, it creates its own threads that are entirely separate from the commonPool. These threads aren't "stolen"; they are created fresh by the new pool. However, the overall system still competes for the same CPU cores, which is why running too many new pools or overloading the CPU can hurt performance.
@Slaw, spot on. @Andoraxx, when a thread in the ForkJoinPool sleeps, it temporarily removes itself from active execution. The pool can’t reassign that thread to another task until the sleep completes, which stalls progress. Using ForkJoinPool.ManagedBlocker can help the pool manage blocked threads more efficiently. This mechanism signals the pool to temporarily add new threads if existing ones are blocked, maintaining parallelism without starving other tasks.
@l'L'l I tried using the shared inner pool as noted in your example but it didn't really help much (almost no difference to the commonPool method). I realised that because of the nature of the benchmark (thread sleeping being a blocking op), I'd need to add a fairly large number of threads even with a shared pool, for the same reasons @Slaw noted (testing indicates I'd need around 50 more threads to get similar performance to new pool). I've marked Slaw's answer as the correct one since it noted the Thread::sleep issue, but your comments also helped me dig into what was going on. Thanks !
|
4

Why Throughput Increases

Your tasks are simply a call to Thread::sleep. That blocks the calling thread, which means the OS will not schedule the thread for execution until the specified duration elapses. This leaves the CPU free to execute any other threads. In other words, your tasks are not CPU-bound and thus do not burden the CPU. Which means throwing more threads at your set of tasks is going to increase throughput without overwhelming the CPU.

By using multiple fork-join pools, you are effectively increasing the number of threads available to execute your tasks. It's not much different from simply increasing the number of threads in a single pool. Whether you have 1 pool with 15 threads or 3 pools with 5 threads each, you still end up with a total of 15 threads.

Let's say you have 10 tasks that each sleep for 5 milliseconds. If you have 5 threads to execute those tasks, then you'll roughly see:

Start 5 tasks => Wait 5 ms => Start 5 tasks => Wait 5 ms => Done!

But if you have 10 threads you'll roughly see:

Start 10 tasks => Wait 5 ms => Done!

The first takes a total of 10 milliseconds to execute every task, the second only takes 5 milliseconds. And that's basically where the increased throughput is coming from in your tests.


Maintaining Parallelism

All that said, a ForkJoinPool has a set level of parallelism. One way it tries to maintain this parallelism is by spawning a new thread (if the maximum number of threads hasn't already been reached) when one of its threads is blocked. From the documentation:

[A ForkJoinPool] attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization. The nested ForkJoinPool.ManagedBlocker interface enables extension of the kinds of synchronization accommodated.

You're calling Thread::sleep in an unmanaged way. In other words, you're blocking the threads of the pool in such a way that the pool cannot compensate. To prevent that, consider using a ManagedBlocker. Here's an example implementation:

import java.time.Duration;
import java.util.concurrent.ForkJoinPool;

public class SleepManagedBlocker implements ForkJoinPool.ManagedBlocker {

  private final Duration sleepDuration;
  private boolean slept; // Does this need to be volatile?

  public SleepManagedBlocker(Duration slepDuration) {
    this.sleepDuration = slepDuration;
  }

  @Override
  public boolean block() throws InterruptedException {
    if (!slept) {
      slept = true;
      Thread.sleep(sleepDuration);
    }
    return slept;
  }

  @Override
  public boolean isReleasable() {
    return slept;
  }
}

Then you would replace the Thread.sleep(5) calls with:

ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)))

You should see similar throughput increases in your tests without needing to using multiple fork-join pools.


JMH Benchmarks

Here is a benchmark showing the effect of using ManagedBlocker in this case. It was compiled and executed on Java 23.

import java.time.Duration;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@Fork(value = 1, jvmArgsAppend = {"-Djava.util.concurrent.ForkJoinPool.common.maximumSpares=1024"})
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public class FJPBenchmarks {

  @Benchmark
  public void runTest(TestState state, Blackhole bh) {
    state.executeOuterLoop(bh);
  }

  @State(Scope.Benchmark)
  public static class TestState {

    private static final Duration SLEEP_DURATION = Duration.ofMillis(5);
    private static final int OUTER_LOOP_COUNT = 32;
    private static final int INNER_LOOP_COUNT = 32;

    @Param({"sequential", "parallel"})
    private String sequentialMode;

    @Param({"common", "separate"})
    private String poolMode;

    @Param({"raw", "managed"})
    private String sleepMode;

    void executeOuterLoop(Blackhole bh) {
      IntStream.range(0, OUTER_LOOP_COUNT)
          .unordered()
          .parallel()
          .forEach(i -> {
            executeInnerLoop(createInnerLoop());
            bh.consume(i);
          });
    }

    IntStream createInnerLoop() {
      var stream = IntStream.range(0, INNER_LOOP_COUNT).unordered();
      return switch (sequentialMode) {
        case "sequential" -> stream.sequential();
        case "parallel" -> stream.parallel();
        default -> throw new IllegalStateException("bad sequentialMode: " + sequentialMode);
      };
    }

    void executeInnerLoop(IntStream loop) {
      var sleeper = getSleeper();
      switch (poolMode) {
        case "common" -> loop.forEach(_ -> sleeper.sleepUnchecked());
        case "separate" -> {
          try (var pool = new ForkJoinPool()) {
            loop.forEach(_ -> pool.submit(sleeper::sleepUnchecked).join());
          }
        }
        default -> throw new IllegalStateException("bad poolMode: " + poolMode);
      }
    }

    Sleeper getSleeper() {
      return switch (sleepMode) {
        case "raw" -> () -> Thread.sleep(SLEEP_DURATION);
        case "managed" -> () -> ForkJoinPool.managedBlock(new SleepManagedBlocker());
        default -> throw new IllegalStateException("bad sleepMode: " + sleepMode);
      };
    }

    @FunctionalInterface
    interface Sleeper {
  
      void sleep() throws InterruptedException;

      default Void sleepUnchecked() {
        try {
          sleep();
        } catch (InterruptedException ex) {
          throw new RuntimeException(ex);
        }
        return null;
      }
    }

    static class SleepManagedBlocker implements ForkJoinPool.ManagedBlocker {

      private boolean slept;

      @Override
      public boolean block() throws InterruptedException {
        if (!slept) {
          slept = true;
          Thread.sleep(SLEEP_DURATION);
        }
        return true;
      }

      @Override
      public boolean isReleasable() {
        return slept;
      }
    }
  }
}

Results (from executing the benchmark on a computer with 8 processors):

Benchmark              (poolMode)  (sequentialMode)  (sleepMode)   Mode  Cnt   Score   Error  Units
FJPBenchmarks.runTest      common        sequential          raw  thrpt    5   1.463 � 0.022  ops/s
FJPBenchmarks.runTest      common        sequential      managed  thrpt    5   5.858 � 0.026  ops/s
FJPBenchmarks.runTest      common          parallel          raw  thrpt    5   1.454 � 0.044  ops/s
FJPBenchmarks.runTest      common          parallel      managed  thrpt    5  35.997 � 0.234  ops/s
FJPBenchmarks.runTest    separate        sequential          raw  thrpt    5   1.426 � 0.325  ops/s
FJPBenchmarks.runTest    separate        sequential      managed  thrpt    5   1.348 � 0.157  ops/s
FJPBenchmarks.runTest    separate          parallel          raw  thrpt    5  13.505 � 1.175  ops/s
FJPBenchmarks.runTest    separate          parallel      managed  thrpt    5  16.864 � 0.186  ops/s

3 Comments

Thanks ! I think you hit the nail on the head - for this specific benchmark the difference is due to thread sleeps. I tried ManagedBlocker and it does bring the throughput up nicely...to an extent. I unfortunately get RejectedElementException after a certain threshold (setting both outerLoop and innerLoop to IntStream.range(0, 20) triggers it on my system), which tells me parallelStream and managedBlocker don't seem to mix well...but using ManagedBlocker on a shared inner pool is fine (tested on both JDK 11 and 17)! But that's a topic for a separate post. Thanks again !
The RejectedExecutionException has to do with the differences in the default maximum pool sizes between the common pool and a custom pool created via the no-argument constructor. I've added a JMH benchmark to my answer showing both increasing the maximum pool size of the common pool (by setting the appropriate system property via the @Fork annotation) and the effect of using ManagedBlocker in this particular case.
Thanks for the detailed explanation (and for answering my question on the other post too) !

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.