How to add correct cancellation when downloading a file with the example in the samples of the new P
Posted
by Mike
on Stack Overflow
See other posts from Stack Overflow
or by Mike
Published on 2009-12-11T16:12:36Z
Indexed on
2010/05/08
4:18 UTC
Read the original article
Hit count: 390
.NET
|parallel-programming
Hello everybody,
I have downloaded the last samples of the Parallel Programming team, and I don't succeed in adding correctly the possibility to cancel the download of a file.
Here is the code I ended to have:
var wreq = (HttpWebRequest)WebRequest.Create(uri);
// Fire start event
DownloadStarted(this, new DownloadStartedEventArgs(remoteFilePath));
long totalBytes = 0;
wreq.DownloadDataInFileAsync(tmpLocalFile,
cancellationTokenSource.Token,
allowResume,
totalBytesAction =>
{
totalBytes = totalBytesAction;
},
readBytes =>
{
Log.Debug("Progression : {0} / {1} => {2}%", readBytes, totalBytes, 100 * (double)readBytes / totalBytes);
DownloadProgress(this, new DownloadProgressEventArgs(remoteFilePath, readBytes, totalBytes, (int)(100 * readBytes / totalBytes)));
})
.ContinueWith( (antecedent ) =>
{
if (antecedent.IsFaulted)
Log.Debug(antecedent.Exception.Message);
//Fire end event
SetEndDownload(antecedent.IsCanceled, antecedent.Exception, tmpLocalFile, 0);
}, cancellationTokenSource.Token);
I want to fire an end event after the download is finished, hence the ContinueWith.
I slightly changed the code of the samples to add the CancellationToken and the 2 delegates to get the size of the file to download, and the progression of the download:
return webRequest.GetResponseAsync()
.ContinueWith(response =>
{
if (totalBytesAction != null)
totalBytesAction(response.Result.ContentLength);
response.Result.GetResponseStream().WriteAllBytesAsync(filePath, ct, resumeDownload, progressAction).Wait(ct);
}, ct);
I had to add the call to the Wait function, because if I don't, the method exits and the end event is fired too early.
Here are the modified method extensions (lot of code, apologies :p)
public static Task WriteAllBytesAsync(this Stream stream, string filePath, CancellationToken ct, bool resumeDownload = false, Action<long> progressAction = null)
{
if (stream == null) throw new ArgumentNullException("stream");
// Copy from the source stream to the memory stream and return the copied data
return stream.CopyStreamToFileAsync(filePath, ct, resumeDownload, progressAction);
}
public static Task CopyStreamToFileAsync(this Stream source, string destinationPath, CancellationToken ct, bool resumeDownload = false, Action<long> progressAction = null)
{
if (source == null) throw new ArgumentNullException("source");
if (destinationPath == null) throw new ArgumentNullException("destinationPath");
// Open the output file for writing
var destinationStream = FileAsync.OpenWrite(destinationPath);
// Copy the source to the destination stream, then close the output file.
return CopyStreamToStreamAsync(source, destinationStream, ct, progressAction).ContinueWith(t =>
{
var e = t.Exception;
destinationStream.Close();
if (e != null)
throw e;
}, ct, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
}
public static Task CopyStreamToStreamAsync(this Stream source, Stream destination, CancellationToken ct, Action<long> progressAction = null)
{
if (source == null) throw new ArgumentNullException("source");
if (destination == null) throw new ArgumentNullException("destination");
return Task.Factory.Iterate(CopyStreamIterator(source, destination, ct, progressAction));
}
private static IEnumerable<Task> CopyStreamIterator(Stream input, Stream output, CancellationToken ct, Action<long> progressAction = null)
{
// Create two buffers. One will be used for the current read operation and one for the current
// write operation. We'll continually swap back and forth between them.
byte[][] buffers = new byte[2][] { new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
int filledBufferNum = 0;
Task writeTask = null;
int readBytes = 0;
// Until there's no more data to be read or cancellation
while (true)
{
ct.ThrowIfCancellationRequested();
// Read from the input asynchronously
var readTask = input.ReadAsync(buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);
// If we have no pending write operations, just yield until the read operation has
// completed. If we have both a pending read and a pending write, yield until both the read
// and the write have completed.
yield return writeTask == null
? readTask
: Task.Factory.ContinueWhenAll(new[]
{
readTask,
writeTask
},
tasks => tasks.PropagateExceptions());
// If no data was read, nothing more to do.
if (readTask.Result <= 0)
break;
readBytes += readTask.Result;
if (progressAction != null)
progressAction(readBytes);
// Otherwise, write the written data out to the file
writeTask = output.WriteAsync(buffers[filledBufferNum], 0, readTask.Result);
// Swap buffers
filledBufferNum ^= 1;
}
}
So basically, at the end of the chain of called methods, I let the CancellationToken throw an OperationCanceledException if a Cancel has been requested.
What I hoped was to get IsFaulted == true in the appealing code and to fire the end event with the canceled flags and the correct exception.
But what I get is an unhandled exception on the line
response.Result.GetResponseStream().WriteAllBytesAsync(filePath, ct, resumeDownload, progressAction).Wait(ct);
telling me that I don't catch an AggregateException. I've tried various things, but I don't succeed to make the whole thing work properly.
Does anyone of you have played enough with that library and may help me?
Thanks in advance
Mike
© Stack Overflow or respective owner