Many routines are parallelized because they are long running processes. When writing an algorithm that will run for a long period of time, its typically a good practice to allow that routine to be cancelled. I previously discussed terminating a parallel loop from within, but have not demonstrated how a routine can be cancelled from the caller’s perspective. Cancellation in PLINQ and the Task Parallel Library is handled through a new, unified cooperative cancellation model introduced with .NET 4.0.
Cancellation in .NET 4 is based around a new, lightweight struct called CancellationToken. A CancellationToken is a small, thread-safe value type which is generated via a CancellationTokenSource. There are many goals which led to this design. For our purposes, we will focus on a couple of specific design decisions:
Cancellation is cooperative. A calling method can request a cancellation, but it’s up to the processing routine to terminate – it is not forced.
Cancellation is consistent. A single method call requests a cancellation on every copied CancellationToken in the routine.
Let’s begin by looking at how we can cancel a PLINQ query. Supposed we wanted to provide the option to cancel our query from Part 6:
double min = collection
.AsParallel()
.Min(item => item.PerformComputation());
.csharpcode, .csharpcode pre
{
font-size: small;
color: black;
font-family: consolas, "Courier New", courier, monospace;
background-color: #ffffff;
/*white-space: pre;*/
}
.csharpcode pre { margin: 0em; }
.csharpcode .rem { color: #008000; }
.csharpcode .kwrd { color: #0000ff; }
.csharpcode .str { color: #006080; }
.csharpcode .op { color: #0000c0; }
.csharpcode .preproc { color: #cc6633; }
.csharpcode .asp { background-color: #ffff00; }
.csharpcode .html { color: #800000; }
.csharpcode .attr { color: #ff0000; }
.csharpcode .alt
{
background-color: #f4f4f4;
width: 100%;
margin: 0em;
}
.csharpcode .lnum { color: #606060; }
We would rewrite this to allow for cancellation by adding a call to ParallelEnumerable.WithCancellation as follows:
var cts = new CancellationTokenSource();
// Pass cts here to a routine that could,
// in parallel, request a cancellation
try
{
double min = collection
.AsParallel()
.WithCancellation(cts.Token)
.Min(item => item.PerformComputation());
}
catch (OperationCanceledException e)
{
// Query was cancelled before it finished
}
.csharpcode, .csharpcode pre
{
font-size: small;
color: black;
font-family: consolas, "Courier New", courier, monospace;
background-color: #ffffff;
/*white-space: pre;*/
}
.csharpcode pre { margin: 0em; }
.csharpcode .rem { color: #008000; }
.csharpcode .kwrd { color: #0000ff; }
.csharpcode .str { color: #006080; }
.csharpcode .op { color: #0000c0; }
.csharpcode .preproc { color: #cc6633; }
.csharpcode .asp { background-color: #ffff00; }
.csharpcode .html { color: #800000; }
.csharpcode .attr { color: #ff0000; }
.csharpcode .alt
{
background-color: #f4f4f4;
width: 100%;
margin: 0em;
}
.csharpcode .lnum { color: #606060; }
Here, if the user calls cts.Cancel() before the PLINQ query completes, the query will stop processing, and an OperationCanceledException will be raised.
Be aware, however, that cancellation will not be instantaneous. When cts.Cancel() is called, the query will only stop after the current item.PerformComputation() elements all finish processing. cts.Cancel() will prevent PLINQ from scheduling a new task for a new element, but will not stop items which are currently being processed. This goes back to the first goal I mentioned – Cancellation is cooperative. Here, we’re requesting the cancellation, but it’s up to PLINQ to terminate.
If we wanted to allow cancellation to occur within our routine, we would need to change our routine to accept a CancellationToken, and modify it to handle this specific case:
public void PerformComputation(CancellationToken token)
{
for (int i=0; i<this.iterations; ++i)
{
// Add a check to see if we've been canceled
// If a cancel was requested, we'll throw here
token.ThrowIfCancellationRequested();
// Do our processing now
this.RunIteration(i);
}
}
With this overload of PerformComputation, each internal iteration checks to see if a cancellation request was made, and will throw an OperationCanceledException at that point, instead of waiting until the method returns. This is good, since it allows us, as developers, to plan for cancellation, and terminate our routine in a clean, safe state.
This is handled by changing our PLINQ query to:
try
{
double min = collection
.AsParallel()
.WithCancellation(cts.Token)
.Min(item => item.PerformComputation(cts.Token));
}
catch (OperationCanceledException e)
{
// Query was cancelled before it finished
}
PLINQ is very good about handling this exception, as well. There is a very good chance that multiple items will raise this exception, since the entire purpose of PLINQ is to have multiple items be processed concurrently. PLINQ will take all of the OperationCanceledException instances raised within these methods, and merge them into a single OperationCanceledException in the call stack. This is done internally because we added the call to ParallelEnumerable.WithCancellation.
If, however, a different exception is raised by any of the elements, the OperationCanceledException as well as the other Exception will be merged into a single AggregateException.
The Task Parallel Library uses the same cancellation model, as well. Here, we supply our CancellationToken as part of the configuration. The ParallelOptions class contains a property for the CancellationToken. This allows us to cancel a Parallel.For or Parallel.ForEach routine in a very similar manner to our PLINQ query. As an example, we could rewrite our Parallel.ForEach loop from Part 2 to support cancellation by changing it to:
try
{
var cts = new CancellationTokenSource();
var options = new ParallelOptions()
{
CancellationToken = cts.Token
};
Parallel.ForEach(customers, options, customer =>
{
// Run some process that takes some time...
DateTime lastContact = theStore.GetLastContact(customer);
TimeSpan timeSinceContact = DateTime.Now - lastContact;
// Check for cancellation here
options.CancellationToken.ThrowIfCancellationRequested();
// If it's been more than two weeks, send an email, and update...
if (timeSinceContact.Days > 14)
{
theStore.EmailCustomer(customer);
customer.LastEmailContact = DateTime.Now;
}
});
}
catch (OperationCanceledException e)
{
// The loop was cancelled
}
Notice that here we use the same approach taken in PLINQ. The Task Parallel Library will automatically handle our cancellation in the same manner as PLINQ, providing a clean, unified model for cancellation of any parallel routine. The TPL performs the same aggregation of the cancellation exceptions as PLINQ, as well, which is why a single exception handler for OperationCanceledException will cleanly handle this scenario. This works because we’re using the same CancellationToken provided in the ParallelOptions. If a different exception was thrown by one thread, or a CancellationToken from a different CancellationTokenSource was used to raise our exception, we would instead receive all of our individual exceptions merged into one AggregateException.