C#: System.Collections.Concurrent.ConcurrentQueue vs. Queue

Posted by James Michael Hare on Geeks with Blogs See other posts from Geeks with Blogs or by James Michael Hare
Published on Mon, 07 Jun 2010 18:37:11 GMT Indexed on 2010/06/08 1:42 UTC
Read the original article Hit count: 1483

Filed under:

I love new toys, so of course when .NET 4.0 came out I felt like the proverbial kid in the candy store!  Now, some people get all excited about the IDE and it’s new features or about changes to WPF and Silver Light and yes, those are all very fine and grand.  But me, I get all excited about things that tend to affect my life on the backside of development.  That’s why when I heard there were going to be concurrent container implementations in the latest version of .NET I was salivating like Pavlov’s dog at the dinner bell.

They seem so simple, really, that one could easily overlook them.  Essentially they are implementations of containers (many that mirror the generic collections, others are new) that have either been optimized with very efficient, limited, or no locking but are still completely thread safe -- and I just had to see what kind of an improvement that would translate into.

Since part of my job as a solutions architect here where I work is to help design, develop, and maintain the systems that process tons of requests each second, the thought of extremely efficient thread-safe containers was extremely appealing.  Of course, they also rolled out a whole parallel development framework which I won’t get into in this post but will cover bits and pieces of as time goes by.

This time, I was mainly curious as to how well these new concurrent containers would perform compared to areas in our code where we manually synchronize them using lock or some other mechanism.  So I set about to run a processing test with a series of producers and consumers that would be either processing a traditional System.Collections.Generic.Queue or a System.Collection.Concurrent.ConcurrentQueue.

Now, I wanted to keep the code as common as possible to make sure that the only variance was the container, so I created a test Producer and a test Consumer.  The test Producer takes an Action<string> delegate which is responsible for taking a string and placing it on whichever queue we’re testing in a thread-safe manner:

   1: internal class Producer
   2: {
   3:     public int Iterations { get; set; }
   4:     public Action<string> ProduceDelegate { get; set; }
   5:     
   6:     public void Produce()
   7:     {
   8:         for (int i = 0; i < Iterations; i++)
   9:         {
  10:             ProduceDelegate(“Hello”);
  11:         }
  12:     }
  13: }

Then likewise, I created a consumer that took a Func<string> that would read from whichever queue we’re testing and return either the string if data exists or null if not.  Then, if the item doesn’t exist, it will do a 10 ms wait before testing again.  Once all the producers are done and join the main thread, a flag will be set in each of the consumers to tell them once the queue is empty they can shut down since no other data is coming:

   1: internal class Consumer
   2: {
   3:     public Func<string> ConsumeDelegate { get; set; }
   4:     public bool HaltWhenEmpty { get; set; }
   5:     
   6:     public void Consume()
   7:     {
   8:         bool processing = true;
   9:         
  10:         while (processing)
  11:         {
  12:             string result = ConsumeDelegate();
  13:             
  14:             if(result == null)
  15:             {
  16:                 if (HaltWhenEmpty)
  17:                 {
  18:                     processing = false;
  19:                 }
  20:                 else
  21:                 {
  22:                     Thread.Sleep(TimeSpan.FromMilliseconds(10));
  23:                 }
  24:             }
  25:             else
  26:             {
  27:                 DoWork();    // do something non-trivial so consumers lag behind a bit
  28:             }
  29:         }
  30:     }
  31: }    

Okay, now that we’ve done that, we can launch threads of varying numbers using lambdas for each different method of production/consumption.  First let's look at the lambdas for a typical System.Collections.Generics.Queue with locking:

   1: // lambda for putting to typical Queue with locking...
   2: var productionDelegate = s => 
   3:     {
   4:         lock (_mutex)
   5:         {
   6:             _mutexQueue.Enqueue(s);
   7:         }
   8:     };
   9:  
  10: // and lambda for typical getting from Queue with locking... 
  11: var consumptionDelegate = () =>
  12:     {
  13:         lock (_mutex)
  14:         {
  15:             if (_mutexQueue.Count > 0)
  16:             {
  17:                 return _mutexQueue.Dequeue();
  18:             }
  19:         }
  20:         return null;
  21:     };

Nothing new or interesting here.  Just typical locks on an internal object instance.  Now let's look at using a ConcurrentQueue from the System.Collections.Concurrent library:

   1: // lambda for putting to a ConcurrentQueue, notice it needs no locking!
   2: var productionDelegate = s =>
   3:     {
   4:         _concurrentQueue.Enqueue(s);
   5:     };
   6:  
   7: // lambda for getting from a ConcurrentQueue, once again, no locking required.
   8: var consumptionDelegate = () =>
   9:     {
  10:         string s;
  11:         return _concurrentQueue.TryDequeue(out s) ? s : null;
  12:     };

So I pass each of these lambdas and the number of producer and consumers threads to launch and take a look at the timing results.  Basically I’m timing from the time all threads start and begin producing/consuming to the time that all threads rejoin. 

I won't bore you with the test code, basically it just launches code that creates the producers and consumers and launches them in their own threads, then waits for them all to rejoin.  The following are the timings from the start of all threads to the Join() on all threads completing.  The producers create 10,000,000 items evenly between themselves and then when all producers are done they trigger the consumers to stop once the queue is empty.

These are the results in milliseconds from the ordinary Queue with locking:

   1: Consumers   Producers   1       2       3       Time (ms)
   2: ----------  ----------  ------  ------  ------  ---------
   3: 1           1           4284    5153    4226    4554.33
   4: 10          10          4044    3831    5010    4295.00
   5: 100         100         5497    5378    5612    5495.67
   6: 1000        1000        24234   25409   27160   25601.00

And the following are the results in milliseconds from the ConcurrentQueue with no locking necessary:

   1: Consumers   Producers   1       2       3       Time (ms)
   2: ----------  ----------  ------  ------  ------  ---------
   3: 1           1           3647    3643    3718    3669.33
   4: 10          10          2311    2136    2142    2196.33
   5: 100         100         2480    2416    2190    2362.00
   6: 1000        1000        7289    6897    7061    7082.33

Note that even though obviously 2000 threads is quite extreme, the concurrent queue actually scales really well, whereas the traditional queue with simple locking scales much more poorly.

I love the new concurrent collections, they look so much simpler without littering your code with the locking logic, and they perform much better.  All in all, a great new toy to add to your arsenal of multi-threaded processing!

© Geeks with Blogs or respective owner