2

My _baseBlockContainer.GetBaseBlocks(); returns a ConcurrentQueue with 15317 objects. For further processing I want to sort them by Type. However, it always "misses" some objects.

It appears that my Parallel.ForEach is not thread-safe because the amount of objects within the ConcurrentQueue for a Type is sometimes less (off by 1 to 250 objects for a Type) than when sorted by the synchronous foreach; but I do not see where/why.

var baseBlocks = _baseBlockContainer.GetBaseBlocks();

var baseBlocksByTypeConcurrent = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this always differ
Parallel.ForEach(baseBlocks, bb =>
{
  if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
  {
    baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
  }
  baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);
});

var baseBlocksByType = new ConcurrentDictionary<Type, ConcurrentQueue<BaseBlock>>();
// results of this are always the same
foreach (var bb in baseBlocks)
{
  if (!baseBlocksByType.ContainsKey(bb.GetType()))
  {
     baseBlocksByType[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
  }
  baseBlocksByType[bb.GetType()].Enqueue(bb);
}
1
  • No. When I call _baseBlockContainer.GetBaseBlocks(); after this method or somewhere else in the class the amount of objects within the Queue didn't change (obviously, since I didn't Dequeue() any anywhere in my code). I see now that my wording was pretty bad. The Parallel.ForEach is just missing / ignoring / not adding some objects to the Dictionary Commented May 31, 2022 at 6:36

2 Answers 2

4

Replace this:

if (!baseBlocksByTypeConcurrent.ContainsKey(bb.GetType()))
{
    baseBlocksByTypeConcurrent[bb.GetType()] = new ConcurrentQueue<BaseBlock>();
}
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);

with this:

baseBlocksByTypeConcurrent.TryAdd(bb.GetType(), new ConcurrentQueue<BaseBlock>());
baseBlocksByTypeConcurrent[bb.GetType()].Enqueue(bb);

The problem with your existing code is that if .ContainsKey evaluates to false in multiple threads at the same time for the same block type, then they will all set the value corresponding to the type to a new queue, erasing any existing queue for that type. That is to say: ContainsKey and the indexer are, by themselves, thread safe, but not when used separately in the way you are doing it.

TryAdd is thread safe and will only add that key once, rather than rewriting it as assigning to the indexer would.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you very much! Your solution did not only fix the problem but also reduces nesting! :-)
1

Your code is suffering from what is known as race condition. Using concurrent collections alone does not prevent race condition to occur. You also have to use them correctly, by utilizing their special atomic APIs. In your case the appropriate API to use is the GetOrAdd method:

Adds a key/value pair to the ConcurrentDictionary<TKey,TValue> if the key does not already exist. Returns the new value, or the existing value if the key already exists.

Usage:

ParallelOptions options = new()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(baseBlocks, options, bb =>
{
    baseBlocksByTypeConcurrent
        .GetOrAdd(bb.GetType(), _ => new ConcurrentQueue<BaseBlock>())
        .Enqueue(bb);
});

As a side note, whenever you use the Parallel.ForEach method it is advisable¹ to specify explicitly the MaxDegreeOfParallelism. The default MaxDegreeOfParallelism is -1, which means unbounded parallelism, which in practice saturates the ThreadPool.

¹ It is advisable by me, not Microsoft. The general advice offered in the official documentation is that modifying the MaxDegreeOfParallelism setting is not needed. Microsoft's argument is that configuring by default this option to Environment.ProcessorCount might result in performance regression, and also that it opens the possibility of deadlock in case one iteration depends on another executing iteration. Apparently Microsoft does not consider the ThreadPool saturation to be a serious problem.

2 Comments

Thank you for your answer and your suggestions. One follow-up question: The program will run on a server for a few seconds and should be as fast as possible. Is ThreadPool saturation even a consideration for this task, since no other program will need resources during that time (or in other words I am fine if other stuff runs slower during the processing)
@MichaelS my opinion is that saturating the ThreadPool is wrong in principle. In the case that you describe, the impact of saturating the ThreadPool is likely to be non existent, but why do it? It will only serve as a bad habit for the next time that you'll need to use the Parallel.ForEach, in which case the impact might be significant.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.