In the first week of concurrent collections, began with a general introduction and discussed the ConcurrentStack<T> and ConcurrentQueue<T>. The last post discussed the ConcurrentDictionary<T> . Finally this week, we shall close with a discussion of the ConcurrentBag<T> and BlockingCollection<T>. For more of the "Little Wonders" posts, see C#/.NET Little Wonders: A Redux. Recap As you'll recall from the previous posts, the original collections were object-based containers that accomplished synchronization through a Synchronized member. With the advent of .NET 2.0, the original collections were succeeded by the generic collections which are fully type-safe, but eschew automatic synchronization. With .NET 4.0, a new breed of collections was born in the System.Collections.Concurrent namespace. Of these, the final concurrent collection we will examine is the ConcurrentBag and a very useful wrapper class called the BlockingCollection. For some excellent information on the performance of the concurrent collections and how they perform compared to a traditional brute-force locking strategy, see this informative whitepaper by the Microsoft Parallel Computing Platform team here. ConcurrentBag<T> – Thread-safe unordered collection. Unlike the other concurrent collections, the ConcurrentBag<T> has no non-concurrent counterpart in the .NET collections libraries. Items can be added and removed from a bag just like any other collection, but unlike the other collections, the items are not maintained in any order. This makes the bag handy for those cases when all you care about is that the data be consumed eventually, without regard for order of consumption or even fairness – that is, it’s possible new items could be consumed before older items given the right circumstances for a period of time. So why would you ever want a container that can be unfair? Well, to look at it another way, you can use a ConcurrentQueue and get the fairness, but it comes at a cost in that the ordering rules and synchronization required to maintain that ordering can affect scalability a bit. Thus sometimes the bag is great when you want the fastest way to get the next item to process, and don’t care what item it is or how long its been waiting. The way that the ConcurrentBag works is to take advantage of the new ThreadLocal<T> type (new in System.Threading for .NET 4.0) so that each thread using the bag has a list local to just that thread. This means that adding or removing to a thread-local list requires very low synchronization. The problem comes in where a thread goes to consume an item but it’s local list is empty. In this case the bag performs “work-stealing” where it will rob an item from another thread that has items in its list. This requires a higher level of synchronization which adds a bit of overhead to the take operation. So, as you can imagine, this makes the ConcurrentBag good for situations where each thread both produces and consumes items from the bag, but it would be less-than-idea in situations where some threads are dedicated producers and the other threads are dedicated consumers because the work-stealing synchronization would outweigh the thread-local optimization for a thread taking its own items. Like the other concurrent collections, there are some curiosities to keep in mind: IsEmpty(), Count, ToArray(), and GetEnumerator() lock collection Each of these needs to take a snapshot of whole bag to determine if empty, thus they tend to be more expensive and cause Add() and Take() operations to block. ToArray() and GetEnumerator() are static snapshots Because it is based on a snapshot, will not show subsequent updates after snapshot. Add() is lightweight Since adding to the thread-local list, there is very little overhead on Add. TryTake() is lightweight if items in thread-local list As long as items are in the thread-local list, TryTake() is very lightweight, much more so than ConcurrentStack() and ConcurrentQueue(), however if the local thread list is empty, it must steal work from another thread, which is more expensive. Remember, a bag is not ideal for all situations, it is mainly ideal for situations where a process consumes an item and either decomposes it into more items to be processed, or handles the item partially and places it back to be processed again until some point when it will complete. The main point is that the bag works best when each thread both takes and adds items. For example, we could create a totally contrived example where perhaps we want to see the largest power of a number before it crosses a certain threshold. Yes, obviously we could easily do this with a log function, but bare with me while I use this contrived example for simplicity. So let’s say we have a work function that will take a Tuple out of a bag, this Tuple will contain two ints. The first int is the original number, and the second int is the last multiple of that number. So we could load our bag with the initial values (let’s say we want to know the last multiple of each of 2, 3, 5, and 7 under 100. 1: var bag = new ConcurrentBag<Tuple<int, int>>
2: {
3: Tuple.Create(2, 1),
4: Tuple.Create(3, 1),
5: Tuple.Create(5, 1),
6: Tuple.Create(7, 1)
7: };
Then we can create a method that given the bag, will take out an item, apply the multiplier again,
1: public static void FindHighestPowerUnder(ConcurrentBag<Tuple<int,int>> bag, int threshold)
2: {
3: Tuple<int,int> pair;
4:
5: // while there are items to take, this will prefer local first, then steal if no local
6: while (bag.TryTake(out pair))
7: {
8: // look at next power
9: var result = Math.Pow(pair.Item1, pair.Item2 + 1);
10:
11: if (result < threshold)
12: {
13: // if smaller than threshold bump power by 1
14: bag.Add(Tuple.Create(pair.Item1, pair.Item2 + 1));
15: }
16: else
17: {
18: // otherwise, we're done
19: Console.WriteLine("Highest power of {0} under {3} is {0}^{1} = {2}.",
20: pair.Item1, pair.Item2, Math.Pow(pair.Item1, pair.Item2), threshold);
21: }
22: }
23: }
Now that we have this, we can load up this method as an Action into our Tasks and run it:
1: // create array of tasks, start all, wait for all
2: var tasks = new[]
3: {
4: new Task(() => FindHighestPowerUnder(bag, 100)),
5: new Task(() => FindHighestPowerUnder(bag, 100)),
6: };
7:
8: Array.ForEach(tasks, t => t.Start());
9:
10: Task.WaitAll(tasks);
Totally contrived, I know, but keep in mind the main point! When you have a thread or task that operates on an item, and then puts it back for further consumption – or decomposes an item into further sub-items to be processed – you should consider a ConcurrentBag as the thread-local lists will allow for quick processing. However, if you need ordering or if your processes are dedicated producers or consumers, this collection is not ideal. As with anything, you should performance test as your mileage will vary depending on your situation!
BlockingCollection<T> – A producers & consumers pattern collection
The BlockingCollection<T> can be treated like a collection in its own right, but in reality it adds a producers and consumers paradigm to any collection that implements the interface IProducerConsumerCollection<T>. If you don’t specify one at the time of construction, it will use a ConcurrentQueue<T> as its underlying store.
If you don’t want to use the ConcurrentQueue, the ConcurrentStack and ConcurrentBag also implement the interface (though ConcurrentDictionary does not). In addition, you are of course free to create your own implementation of the interface.
So, for those who don’t remember the producers and consumers classical computer-science problem, the gist of it is that you have one (or more) processes that are creating items (producers) and one (or more) processes that are consuming these items (consumers). Now, the crux of the problem is that there is a bin (queue) where the produced items are placed, and typically that bin has a limited size. Thus if a producer creates an item, but there is no space to store it, it must wait until an item is consumed. Also if a consumer goes to consume an item and none exists, it must wait until an item is produced.
The BlockingCollection makes it trivial to implement any standard producers/consumers process set by providing that “bin” where the items can be produced into and consumed from with the appropriate blocking operations. In addition, you can specify whether the bin should have a limited size or can be (theoretically) unbounded, and you can specify timeouts on the blocking operations.
As far as your choice of “bin”, for the most part the ConcurrentQueue is the right choice because it is fairly light and maximizes fairness by ordering items so that they are consumed in the same order they are produced. You can use the concurrent bag or stack, of course, but your ordering would be random-ish in the case of the former and LIFO in the case of the latter.
So let’s look at some of the methods of note in BlockingCollection:
BoundedCapacity returns capacity of the “bin”
If the bin is unbounded, the capacity is int.MaxValue.
Count returns an internally-kept count of items
This makes it O(1), but if you modify underlying collection directly (not recommended) it is unreliable.
CompleteAdding() is used to cut off further adds.
This sets IsAddingCompleted and begins to wind down consumers once empty.
IsAddingCompleted is true when producers are “done”.
Once you are done producing, should complete the add process to alert consumers.
IsCompleted is true when producers are “done” and “bin” is empty.
Once you mark the producers done, and all items removed, this will be true.
Add() is a blocking add to collection.
If bin is full, will wait till space frees up
Take() is a blocking remove from collection.
If bin is empty, will wait until item is produced or adding is completed.
GetConsumingEnumerable() is used to iterate and consume items.
Unlike the standard enumerator, this one consumes the items instead of iteration.
TryAdd() attempts add but does not block completely
If adding would block, returns false instead, can specify TimeSpan to wait before stopping.
TryTake() attempts to take but does not block completely
Like TryAdd(), if taking would block, returns false instead, can specify TimeSpan to wait.
Note the use of CompleteAdding() to signal the BlockingCollection that nothing else should be added. This means that any attempts to TryAdd() or Add() after marked completed will throw an InvalidOperationException. In addition, once adding is complete you can still continue to TryTake() and Take() until the bin is empty, and then Take() will throw the InvalidOperationException and TryTake() will return false.
So let’s create a simple program to try this out. Let’s say that you have one process that will be producing items, but a slower consumer process that handles them. This gives us a chance to peek inside what happens when the bin is bounded (by default, the bin is NOT bounded).
1: var bin = new BlockingCollection<int>(5);
Now, we create a method to produce items:
1: public static void ProduceItems(BlockingCollection<int> bin, int numToProduce)
2: {
3: for (int i = 0; i < numToProduce; i++)
4: {
5: // try for 10 ms to add an item
6: while (!bin.TryAdd(i, TimeSpan.FromMilliseconds(10)))
7: {
8: Console.WriteLine("Bin is full, retrying...");
9: }
10: }
11:
12: // once done producing, call CompleteAdding()
13: Console.WriteLine("Adding is completed.");
14: bin.CompleteAdding();
15: }
And one to consume them:
1: public static void ConsumeItems(BlockingCollection<int> bin)
2: {
3: // This will only be true if CompleteAdding() was called AND the bin is empty.
4: while (!bin.IsCompleted)
5: {
6: int item;
7:
8: if (!bin.TryTake(out item, TimeSpan.FromMilliseconds(10)))
9: {
10: Console.WriteLine("Bin is empty, retrying...");
11: }
12: else
13: {
14: Console.WriteLine("Consuming item {0}.", item);
15: Thread.Sleep(TimeSpan.FromMilliseconds(20));
16: }
17: }
18: }
Then we can fire them off:
1: // create one producer and two consumers
2: var tasks = new[]
3: {
4: new Task(() => ProduceItems(bin, 20)),
5: new Task(() => ConsumeItems(bin)),
6: new Task(() => ConsumeItems(bin)),
7: };
8:
9: Array.ForEach(tasks, t => t.Start());
10:
11: Task.WaitAll(tasks);
Notice that the producer is faster than the consumer, thus it should be hitting a full bin often and displaying the message after it times out on TryAdd().
1: Consuming item 0.
2: Consuming item 1.
3: Bin is full, retrying...
4: Bin is full, retrying...
5: Consuming item 3.
6: Consuming item 2.
7: Bin is full, retrying...
8: Consuming item 4.
9: Consuming item 5.
10: Bin is full, retrying...
11: Consuming item 6.
12: Consuming item 7.
13: Bin is full, retrying...
14: Consuming item 8.
15: Consuming item 9.
16: Bin is full, retrying...
17: Consuming item 10.
18: Consuming item 11.
19: Bin is full, retrying...
20: Consuming item 12.
21: Consuming item 13.
22: Bin is full, retrying...
23: Bin is full, retrying...
24: Consuming item 14.
25: Adding is completed.
26: Consuming item 15.
27: Consuming item 16.
28: Consuming item 17.
29: Consuming item 19.
30: Consuming item 18.
Also notice that once CompleteAdding() is called and the bin is empty, the IsCompleted property returns true, and the consumers will exit.
Summary
The ConcurrentBag is an interesting collection that can be used to optimize concurrency scenarios where tasks or threads both produce and consume items. In this way, it will choose to consume its own work if available, and then steal if not. However, in situations where you want fair consumption or ordering, or in situations where the producers and consumers are distinct processes, the bag is not optimal.
The BlockingCollection is a great wrapper around all of the concurrent queue, stack, and bag that allows you to add producer and consumer semantics easily including waiting when the bin is full or empty.
That’s the end of my dive into the concurrent collections. I’d also strongly recommend, once again, you read this excellent Microsoft white paper that goes into much greater detail on the efficiencies you can gain using these collections judiciously (here).
Tweet
Technorati Tags: C#,.NET,Concurrent Collections,Little Wonders