0

I have the following async queue processing routing.

      var commandQueue = new BlockingCollection<MyCommand>();
      commandQueue
            .GetConsumingEnumerable()
            .ToObservable(new LimitedConcurrencyLevelTaskPoolScheduler(5))
            .Subscribe(c =>
                           {
                               try
                               {
                                   ProcessCommand(c);
                               }
                               catch (Exception ex)
                               {
                                   Trace.TraceError(ex.ToString());
                               }
                           }
            );

In one specific scenario (when I'm about to get some data), I need to make sure that my commandQueue is empty before going out and getting the data. This operation is expected to happen synchronously. Basically, I want to do something like

  public void GetData()
  {
     commandQueue.WaitForEmpty(); 

     // could potentially be expressed: 
     // while (commandQueue.Count > 0) Thread.Sleep(10);

     return GoGetTheData()
  }

I realize that in an ideal scenario, all callers will "GetData" async...but sometimes it's necessary that it happen in a synchronous manner...and so I need to wait for the command queue to be empty to ensure the consistency and up-to-date-ness of my data.

I know how I can do this pretty easily with a ManualResetEvent...but I'd like to know if there's an easy way with System.Reactive/TPL.

Thanks.

3 Answers 3

1

This is a more difficult question than it seems at first. You want BlockingCollection (and the underlying ConcurrentQueue) for producer-consumer job semantics. But you also want to be able to observe what's happening with these collections, including waiting for the 'empty' signal.

Best bet is to take a look at JobQueue and ParallelJobQueue from here:

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7

Which includes an observable for WhenQueueEmpty and can control the number of simultaneously running jobs and queued jobs (jobs being synonymous in this case with your command concept).

Sign up to request clarification or add additional context in comments.

Comments

0

Could you use this?

    var dataObservable = Observable.Start(() =>
    {
        commandQueue.WaitForEmpty(); 
        return GoGetTheData();
    });

5 Comments

The issue is that there is no such method as WaitForEmpty
@JeffN825 - What is BlockCollection then? Is it something you've defined?
sorry, not sure how that got truncated in my example. it's BlockingCollection msdn.microsoft.com/en-us/library/dd267312.aspx
@JeffN825 - I'm not an expert on BlockingCollection but I would imagine that there are a number of built-in mechanisms for determining when the queue is empty. Just choose one for the WaitForEmpty method. Does that help?
Sort of. I can easily check if it's empty, but what's the most effective way of waiting? Something more effective than Thread.Sleep preferably.
0

It seems to me your requirements are to

  • Fetch data asynchronously
  • Process this data in parallel (max of 5 degrees of parallelism)
  • Repeat the process

If these are your requirements and you are not forced to use the BlockingCollection i.e. it is not an existing API, then I think you can solve this quite easily with Rx alone.

var dataRequestScheduler = new EventLoopScheduler();
var subscription = GetTheData()
    .Repeat()
    .SubscribeOn(dataRequestScheduler)
    .ObserveOn(Scheduler.TaskPool)//new LimitedConcurrencyLevelTaskPoolScheduler(5)
    .Subscribe(c =>
           {
               try
               {
                   ProcessCommand(c);
               }
               catch (Exception ex)
               {
                   Trace.TraceError(ex.ToString());
               }
           }
        );

Where GetTheData method returns an IObservable

You could potentially leverage Observable.Start and Merge(5) to get your max 5 threads without the need for a custom scheduler.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.