Parallelism in .NET – Part 4, Imperative Data Parallelism: Aggregation
- by Reed
In the article on simple data parallelism, I described how to perform an operation on an entire collection of elements in parallel. Often, this is not adequate, as the parallel operation is going to be performing some form of aggregation.
Simple examples of this might include taking the sum of the results of processing a function on each element in the collection, or finding the minimum of the collection given some criteria. This can be done using the techniques described in simple data parallelism, however, special care needs to be taken into account to synchronize the shared data appropriately. The Task Parallel Library has tools to assist in this synchronization.
The main issue with aggregation when parallelizing a routine is that you need to handle synchronization of data. Since multiple threads will need to write to a shared portion of data. Suppose, for example, that we wanted to parallelize a simple loop that looked for the minimum value within a dataset:
double min = double.MaxValue;
foreach(var item in collection)
{
double value = item.PerformComputation();
min = System.Math.Min(min, value);
}
.csharpcode, .csharpcode pre
{
font-size: small;
color: black;
font-family: consolas, "Courier New", courier, monospace;
background-color: #ffffff;
/*white-space: pre;*/
}
.csharpcode pre { margin: 0em; }
.csharpcode .rem { color: #008000; }
.csharpcode .kwrd { color: #0000ff; }
.csharpcode .str { color: #006080; }
.csharpcode .op { color: #0000c0; }
.csharpcode .preproc { color: #cc6633; }
.csharpcode .asp { background-color: #ffff00; }
.csharpcode .html { color: #800000; }
.csharpcode .attr { color: #ff0000; }
.csharpcode .alt
{
background-color: #f4f4f4;
width: 100%;
margin: 0em;
}
.csharpcode .lnum { color: #606060; }
This seems like a good candidate for parallelization, but there is a problem here. If we just wrap this into a call to Parallel.ForEach, we’ll introduce a critical race condition, and get the wrong answer. Let’s look at what happens here:
// Buggy code! Do not use!
double min = double.MaxValue;
Parallel.ForEach(collection, item =>
{
double value = item.PerformComputation();
min = System.Math.Min(min, value);
});
This code has a fatal flaw: min will be checked, then set, by multiple threads simultaneously. Two threads may perform the check at the same time, and set the wrong value for min. Say we get a value of 1 in thread 1, and a value of 2 in thread 2, and these two elements are the first two to run. If both hit the min check line at the same time, both will determine that min should change, to 1 and 2 respectively. If element 1 happens to set the variable first, then element 2 sets the min variable, we’ll detect a min value of 2 instead of 1. This can lead to wrong answers.
Unfortunately, fixing this, with the Parallel.ForEach call we’re using, would require adding locking. We would need to rewrite this like:
// Safe, but slow
double min = double.MaxValue;
// Make a "lock" object
object syncObject = new object();
Parallel.ForEach(collection, item =>
{
double value = item.PerformComputation();
lock(syncObject)
min = System.Math.Min(min, value);
});
This will potentially add a huge amount of overhead to our calculation. Since we can potentially block while waiting on the lock for every single iteration, we will most likely slow this down to where it is actually quite a bit slower than our serial implementation. The problem is the lock statement – any time you use lock(object), you’re almost assuring reduced performance in a parallel situation. This leads to two observations I’ll make:
When parallelizing a routine, try to avoid locks.
That being said:
Always add any and all required synchronization to avoid race conditions.
These two observations tend to be opposing forces – we often need to synchronize our algorithms, but we also want to avoid the synchronization when possible. Looking at our routine, there is no way to directly avoid this lock, since each element is potentially being run on a separate thread, and this lock is necessary in order for our routine to function correctly every time.
However, this isn’t the only way to design this routine to implement this algorithm. Realize that, although our collection may have thousands or even millions of elements, we have a limited number of Processing Elements (PE). Processing Element is the standard term for a hardware element which can process and execute instructions. This typically is a core in your processor, but many modern systems have multiple hardware execution threads per core. The Task Parallel Library will not execute the work for each item in the collection as a separate work item. Instead, when Parallel.ForEach executes, it will partition the collection into larger “chunks” which get processed on different threads via the ThreadPool. This helps reduce the threading overhead, and help the overall speed. In general, the Parallel class will only use one thread per PE in the system.
Given the fact that there are typically fewer threads than work items, we can rethink our algorithm design. We can parallelize our algorithm more effectively by approaching it differently. Because the basic aggregation we are doing here (Min) is communitive, we do not need to perform this in a given order. We knew this to be true already – otherwise, we wouldn’t have been able to parallelize this routine in the first place. With this in mind, we can treat each thread’s work independently, allowing each thread to serially process many elements with no locking, then, after all the threads are complete, “merge” together the results.
This can be accomplished via a different set of overloads in the Parallel class: Parallel.ForEach<TSource,TLocal>. The idea behind these overloads is to allow each thread to begin by initializing some local state (TLocal). The thread will then process an entire set of items in the source collection, providing that state to the delegate which processes an individual item. Finally, at the end, a separate delegate is run which allows you to handle merging that local state into your final results.
To rewriting our routine using Parallel.ForEach<TSource,TLocal>, we need to provide three delegates instead of one. The most basic version of this function is declared as:
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
The first delegate (the localInit argument) is defined as Func<TLocal>. This delegate initializes our local state. It should return some object we can use to track the results of a single thread’s operations.
The second delegate (the body argument) is where our main processing occurs, although now, instead of being an Action<T>, we actually provide a Func<TSource, ParallelLoopState, TLocal, TLocal> delegate. This delegate will receive three arguments: our original element from the collection (TSource), a ParallelLoopState which we can use for early termination, and the instance of our local state we created (TLocal). It should do whatever processing you wish to occur per element, then return the value of the local state after processing is completed.
The third delegate (the localFinally argument) is defined as Action<TLocal>. This delegate is passed our local state after it’s been processed by all of the elements this thread will handle. This is where you can merge your final results together. This may require synchronization, but now, instead of synchronizing once per element (potentially millions of times), you’ll only have to synchronize once per thread, which is an ideal situation.
Now that I’ve explained how this works, lets look at the code:
// Safe, and fast!
double min = double.MaxValue;
// Make a "lock" object
object syncObject = new object();
Parallel.ForEach(
collection,
// First, we provide a local state initialization delegate.
() => double.MaxValue,
// Next, we supply the body, which takes the original item, loop state,
// and local state, and returns a new local state
(item, loopState, localState) =>
{
double value = item.PerformComputation();
return System.Math.Min(localState, value);
},
// Finally, we provide an Action<TLocal>, to "merge" results together
localState =>
{
// This requires locking, but it's only once per used thread
lock(syncObj)
min = System.Math.Min(min, localState);
}
);
Although this is a bit more complicated than the previous version, it is now both thread-safe, and has minimal locking.
This same approach can be used by Parallel.For, although now, it’s Parallel.For<TLocal>. When working with Parallel.For<TLocal>, you use the same triplet of delegates, with the same purpose and results.
Also, many times, you can completely avoid locking by using a method of the Interlocked class to perform the final aggregation in an atomic operation. The MSDN example demonstrating this same technique using Parallel.For uses the Interlocked class instead of a lock, since they are doing a sum operation on a long variable, which is possible via Interlocked.Add.
By taking advantage of local state, we can use the Parallel class methods to parallelize algorithms such as aggregation, which, at first, may seem like poor candidates for parallelization. Doing so requires careful consideration, and often requires a slight redesign of the algorithm, but the performance gains can be significant if handled in a way to avoid excessive synchronization.