Parallelism in .NET – Part 3, Imperative Data Parallelism: Early Termination
- by Reed
Although simple data parallelism allows us to easily parallelize many of our iteration statements, there are cases that it does not handle well. In my previous discussion, I focused on data parallelism with no shared state, and where every element is being processed exactly the same.
Unfortunately, there are many common cases where this does not happen. If we are dealing with a loop that requires early termination, extra care is required when parallelizing.
Often, while processing in a loop, once a certain condition is met, it is no longer necessary to continue processing. This may be a matter of finding a specific element within the collection, or reaching some error case. The important distinction here is that, it is often impossible to know until runtime, what set of elements needs to be processed.
In my initial discussion of data parallelism, I mentioned that this technique is a candidate when you can decompose the problem based on the data involved, and you wish to apply a single operation concurrently on all of the elements of a collection. This covers many of the potential cases, but sometimes, after processing some of the elements, we need to stop processing.
As an example, lets go back to our previous Parallel.ForEach example with contacting a customer. However, this time, we’ll change the requirements slightly. In this case, we’ll add an extra condition – if the store is unable to email the customer, we will exit gracefully. The thinking here, of course, is that if the store is currently unable to email, the next time this operation runs, it will handle the same situation, so we can just skip our processing entirely. The original, serial case, with this extra condition, might look something like the following:
foreach(var customer in customers)
{
// Run some process that takes some time...
DateTime lastContact = theStore.GetLastContact(customer);
TimeSpan timeSinceContact = DateTime.Now - lastContact;
// If it's been more than two weeks, send an email, and update...
if (timeSinceContact.Days > 14)
{
// Exit gracefully if we fail to email, since this
// entire process can be repeated later without issue.
if (theStore.EmailCustomer(customer) == false)
break;
customer.LastEmailContact = DateTime.Now;
}
}
.csharpcode, .csharpcode pre
{
font-size: small;
color: black;
font-family: consolas, "Courier New", courier, monospace;
background-color: #ffffff;
/*white-space: pre;*/
}
.csharpcode pre { margin: 0em; }
.csharpcode .rem { color: #008000; }
.csharpcode .kwrd { color: #0000ff; }
.csharpcode .str { color: #006080; }
.csharpcode .op { color: #0000c0; }
.csharpcode .preproc { color: #cc6633; }
.csharpcode .asp { background-color: #ffff00; }
.csharpcode .html { color: #800000; }
.csharpcode .attr { color: #ff0000; }
.csharpcode .alt
{
background-color: #f4f4f4;
width: 100%;
margin: 0em;
}
.csharpcode .lnum { color: #606060; }
Here, we’re processing our loop, but at any point, if we fail to send our email successfully, we just abandon this process, and assume that it will get handled correctly the next time our routine is run. If we try to parallelize this using Parallel.ForEach, as we did previously, we’ll run into an error almost immediately: the break statement we’re using is only valid when enclosed within an iteration statement, such as foreach. When we switch to Parallel.ForEach, we’re no longer within an iteration statement – we’re a delegate running in a method.
This needs to be handled slightly differently when parallelized. Instead of using the break statement, we need to utilize a new class in the Task Parallel Library: ParallelLoopState. The ParallelLoopState class is intended to allow concurrently running loop bodies a way to interact with each other, and provides us with a way to break out of a loop. In order to use this, we will use a different overload of Parallel.ForEach which takes an IEnumerable<T> and an Action<T, ParallelLoopState> instead of an Action<T>. Using this, we can parallelize the above operation by doing:
Parallel.ForEach(customers, (customer, parallelLoopState) =>
{
// Run some process that takes some time...
DateTime lastContact = theStore.GetLastContact(customer);
TimeSpan timeSinceContact = DateTime.Now - lastContact;
// If it's been more than two weeks, send an email, and update...
if (timeSinceContact.Days > 14)
{
// Exit gracefully if we fail to email, since this
// entire process can be repeated later without issue.
if (theStore.EmailCustomer(customer) == false)
parallelLoopState.Break();
else
customer.LastEmailContact = DateTime.Now;
}
});
There are a couple of important points here. First, we didn’t actually instantiate the ParallelLoopState instance. It was provided directly to us via the Parallel class. All we needed to do was change our lambda expression to reflect that we want to use the loop state, and the Parallel class creates an instance for our use. We also needed to change our logic slightly when we call Break(). Since Break() doesn’t stop the program flow within our block, we needed to add an else case to only set the property in customer when we succeeded. This same technique can be used to break out of a Parallel.For loop.
That being said, there is a huge difference between using ParallelLoopState to cause early termination and to use break in a standard iteration statement. When dealing with a loop serially, break will immediately terminate the processing within the closest enclosing loop statement. Calling ParallelLoopState.Break(), however, has a very different behavior.
The issue is that, now, we’re no longer processing one element at a time. If we break in one of our threads, there are other threads that will likely still be executing. This leads to an important observation about termination of parallel code:
Early termination in parallel routines is not immediate. Code will continue to run after you request a termination.
This may seem problematic at first, but it is something you just need to keep in mind while designing your routine. ParallelLoopState.Break() should be thought of as a request. We are telling the runtime that no elements that were in the collection past the element we’re currently processing need to be processed, and leaving it up to the runtime to decide how to handle this as gracefully as possible. Although this may seem problematic at first, it is a good thing. If the runtime tried to immediately stop processing, many of our elements would be partially processed. It would be like putting a return statement in a random location throughout our loop body – which could have horrific consequences to our code’s maintainability.
In order to understand and effectively write parallel routines, we, as developers, need a subtle, but profound shift in our thinking. We can no longer think in terms of sequential processes, but rather need to think in terms of requests to the system that may be handled differently than we’d first expect. This is more natural to developers who have dealt with asynchronous models previously, but is an important distinction when moving to concurrent programming models.
As an example, I’ll discuss the Break() method. ParallelLoopState.Break() functions in a way that may be unexpected at first. When you call Break() from a loop body, the runtime will continue to process all elements of the collection that were found prior to the element that was being processed when the Break() method was called. This is done to keep the behavior of the Break() method as close to the behavior of the break statement as possible. We can see the behavior in this simple code:
var collection = Enumerable.Range(0, 20);
var pResult = Parallel.ForEach(collection, (element, state) =>
{
if (element > 10)
{
Console.WriteLine("Breaking on {0}", element);
state.Break();
}
Console.WriteLine(element);
});
If we run this, we get a result that may seem unexpected at first:
0
2
1
5
6
3
4
10
Breaking on 11
11
Breaking on 12
12
9
Breaking on 13
13
7
8
Breaking on 15
15
What is occurring here is that we loop until we find the first element where the element is greater than 10. In this case, this was found, the first time, when one of our threads reached element 11. It requested that the loop stop by calling Break() at this point. However, the loop continued processing until all of the elements less than 11 were completed, then terminated. This means that it will guarantee that elements 9, 7, and 8 are completed before it stops processing. You can see our other threads that were running each tried to break as well, but since Break() was called on the element with a value of 11, it decides which elements (0-10) must be processed.
If this behavior is not desirable, there is another option. Instead of calling ParallelLoopState.Break(), you can call ParallelLoopState.Stop(). The Stop() method requests that the runtime terminate as soon as possible , without guaranteeing that any other elements are processed. Stop() will not stop the processing within an element, so elements already being processed will continue to be processed. It will prevent new elements, even ones found earlier in the collection, from being processed. Also, when Stop() is called, the ParallelLoopState’s IsStopped property will return true. This lets longer running processes poll for this value, and return after performing any necessary cleanup.
The basic rule of thumb for choosing between Break() and Stop() is the following.
Use ParallelLoopState.Stop() when possible, since it terminates more quickly. This is particularly useful in situations where you are searching for an element or a condition in the collection. Once you’ve found it, you do not need to do any other processing, so Stop() is more appropriate.
Use ParallelLoopState.Break() if you need to more closely match the behavior of the C# break statement.
Both methods behave differently than our C# break statement. Unfortunately, when parallelizing a routine, more thought and care needs to be put into every aspect of your routine than you may otherwise expect. This is due to my second observation:
Parallelizing a routine will almost always change its behavior.
This sounds crazy at first, but it’s a concept that’s so simple its easy to forget. We’re purposely telling the system to process more than one thing at the same time, which means that the sequence in which things get processed is no longer deterministic. It is easy to change the behavior of your routine in very subtle ways by introducing parallelism. Often, the changes are not avoidable, even if they don’t have any adverse side effects. This leads to my final observation for this post:
Parallelization is something that should be handled with care and forethought, added by design, and not just introduced casually.