2

"ConcurrentBag(T) is a thread-safe bag implementation, optimized for scenarios where the same thread will be both producing and consuming data stored in the bag." - MSDN

I have this exact use case (multiple threads both consuming and producing), but I need to be able to efficiently determine in a timely manner when the bag becomes permanently empty (my threads only produce based on what was consumed, and the bag is jumpstarted with a single element before the threads are started).

I have trouble figuring out a race-condition-free efficient way that is free of global locks to do this. I believe that introducing global locks would negate the benefits of using the mostly lock-free ConcurrentBag.

My actual use case is an "unordered" (binary) tree traversal. I just need to visit every node, and do some very light computation for each and every one of them. I don't care about the order in which they are visited. The algorithm should terminate when all nodes have been visited.

int taskCount = Environment.ProcessorCount;
Task[] tasks = new Task[taskCount];
var bag = new ConcurrentBag<TreeNode>();
bag.Add(root);
for (int i = 0; i < taskCount; i++)
{
    int threadId = i;
    tasks[threadId] = new Task(() =>
    {
        while(???) // Putting bag.IsEmpty>0 here would be obviously wrong as some other thread could have removed the last node but not yet added the node's "children"
        {
            TreeNode node;
            bool success = bag.TryTake(out node);

            if (!success) continue; //This spinning is probably not very clever here, but I don't really mind it.

            // Placeholder: Do stuff with node

            if (node.Left != null) bag.Add(node.Left);
            if (node.Right != null) bag.Add(node.Right);
        }
    });
    tasks[threadId].Start();
}
Task.WaitAll(tasks);

So how could one add an efficient termination condition to this? I don't mind the condition becoming costly when the bag is close to being empty.

3
  • I just notice, the Task.WaitAll(tasks); line is misplaced. Should be after the loop. Also, better use Task.Run. Commented Dec 6, 2015 at 14:20
  • Thank you. I messed up the line placement while I was simplifying the code for this post. It is at the correct place in my original code. Fixed it now. Commented Dec 7, 2015 at 0:50
  • Thanks, I will have to look into the difference between Start and Run. Commented Dec 7, 2015 at 0:50

1 Answer 1

2

I had this problem before. I had threads register as being in a wait state before checking the queue. If the queue is empty and all other threads are waiting as well we are done. If other threads are still busy, here comes the hack, sleep for 10ms. I believe it is possible to solve this without waiting by using some kind synchronization (maybe Barrier).

The code went like this:

string Dequeue()
{
    Interlocked.Increment(ref threadCountWaiting);
    try
    {
        while (true)
        {
            string result = queue.TryDequeue();
            if (result != null)
                return result;

            if (cancellationToken.IsCancellationRequested || threadCountWaiting == pendingThreadCount)
            {
                Interlocked.Decrement(ref pendingThreadCount);
                return null;
            }

            Thread.Sleep(10);
        }
    }
    finally
    {
        Interlocked.Decrement(ref threadCountWaiting);
    }
}

It might be possible to replace both the sleep and the counter maintenance with Barrier. I just did not bother, this was complicated enough already.

Interlocked operations are scalability bottlenecks because they are implemented using hardware spin locks basically. So you might want to insert a fast path at the beginning of the method:

            string result = queue.TryDequeue();
            if (result != null)
                return result;

And most of the time the fast path will be taken.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.