This is a follow-up from this post: Why are inner parallel streams faster with new pools than with the commonPool for this scenario?
Apologies in advance for the wall of text. This is for JDK 17.
I am currently testing methods with these scenarios:
- An outer
parallelStream, and an inner nestedparallelStreamthat usesManagedBlockerto callThread::sleep - An outer
parallelStream, and an inner nestedparallelStreamthat uses a different FJPool, but also usesManagedBlockerto callThread::sleep
Here's how the code looks:
public class NestedPerf {
private static final ForkJoinPool sharedInnerPool = new ForkJoinPool();
public static void main(String[] args){
// testInnerParallelLoopWithSharedPoolAndManagedBlock(10000);
// testInnerParallelLoopWithManagedBlock(2);
}
public static void testInnerParallelLoopWithSharedPoolAndManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
public static void testInnerParallelLoopWithManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoopWithManagedBlock();
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
}
Like the title says, the testInnerParallelLoopWithManagedBlock method breaks with the following exception:
java.util.concurrent.RejectedExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
But the testInnerParallelLoopWithSharedPoolAndManagedBlock method works fine up to a count of 10,000 (and probably more but I haven't tested beyond 10,000).
I initially thought the issue might be due to the interactions between nested parallel streams and ManagedBlocker acting on the same pool. So I created another test method where I use the same shared custom FJPool for both inner and outer loops:
public static void testBothLoopsWithSharedPoolAndManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
sharedInnerPool.submit(()->StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
})).join();
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
But this one works just fine. So the issue appears to specifically be with ForkJoinPool.commonPool().
Does anyone have an idea as to what's happening behind the scenes ?