How to use IObservable/IObserver with ConcurrentQueue or ConcurrentStack

Posted by James Black on Stack Overflow See other posts from Stack Overflow or by James Black
Published on 2010-06-13T00:47:32Z Indexed on 2010/06/13 0:52 UTC
Read the original article Hit count: 388

I realized that when I am trying to process items in a concurrent queue using multiple threads while multiple threads can be putting items into it, the ideal solution would be to use the Reactive Extensions with the Concurrent data structures.

My original question is at:

http://stackoverflow.com/questions/2997797/while-using-concurrentqueue-trying-to-dequeue-while-looping-through-in-parallel/

So I am curious if there is any way to have a LINQ (or PLINQ) query that will continuously be dequeueing as items are put into it.

I am trying to get this to work in a way where I can have n number of producers pushing into the queue and a limited number of threads to process, so I don't overload the database.

If I could use Rx framework then I expect that I could just start it, and if 100 items are placed in within 100ms, then the 20 threads that are part of the PLINQ query would just process through the queue.

There are three technologies I am trying to work together:

  1. Rx Framework (Reactive LINQ)
  2. PLING
  3. System.Collections.Concurrent structures

© Stack Overflow or respective owner

Related posts about c#4.0

Related posts about advice