AWS .NET SDK v2: the message-pump pattern
Posted
by Elton Stoneman
on Geeks with Blogs
See other posts from Geeks with Blogs
or by Elton Stoneman
Published on Fri, 11 Oct 2013 14:20:04 GMT
Indexed on
2013/10/17
16:01 UTC
Read the original article
Hit count: 427
aws
Originally posted on: http://geekswithblogs.net/EltonStoneman/archive/2013/10/11/aws-.net-sdk-v2--the-message-pump-pattern.aspx
Version 2 of the AWS SDK for .NET has had a few pre-release iterations on NuGet and is stable, if a bit lacking in step-by-step guides. There’s at least one big reason to try it out: the SQS queue client now supports asynchronous reads, so you don’t need a clumsy polling mechanism to retrieve messages.
The new approach is easy to use, and lets you work with AWS queues in a similar way to the message-pump pattern used in the latest Azure SDK for Service Bus queues and topics.
I’ve posted a simple wrapper class for subscribing to an SQS hub on gist here: A wrapper for the SQS client in the AWS SDK for.NET v2, which uses the message-pump pattern.
Here’s the core functionality in the subscribe method:
private async void Subscribe() { if (_isListening) { var request = new ReceiveMessageRequest { MaxNumberOfMessages = 10 }; request.QueueUrl = QueueUrl; var result = await _sqsClient.ReceiveMessageAsync(request, _cancellationTokenSource.Token); if (result.Messages.Count > 0) { foreach (var message in result.Messages) { if (_receiveAction != null && message != null) { _receiveAction(message.Body); DeleteMessage(message.ReceiptHandle); } } } } if (_isListening) { Subscribe(); } }
which you call with something like this:
client.Subscribe(x=>Log.Debug(x.Body));
The async SDK call returns when there is something in the queue, and will run your receive action for every message it gets in the batch (defaults to the maximum size of 10 messages per call).
The listener will sit there awaiting messages until you stop it with:
client.Unsubscribe();
Internally it has a cancellation token which it sets when you call unsubscribe, which cancels any in-flight call to SQS and stops the pump.
The wrapper will also create the queue if it doesn’t exist at runtime. The Ensure() method gets called in the constructor so when you first use the client for a queue (sending or subscribing), it will set itself up:
if (!Exists()) { var request = new CreateQueueRequest(); request.QueueName = QueueName; var response = _sqsClient.CreateQueue(request); QueueUrl = response.QueueUrl; }
The Exists() check has to do make a call to ListQueues on the SNS client, as it doesn’t provide its own method to check if a queue exists. That call also populates the Amazon Resource Name, the unique identifier for this queue, which will be useful later.
To use the wrapper, just instantiate and go:
var queueClient = new QueueClient(“ProcessWorkflow”); queueClient.Subscribe(x=>Log.Debug(x.Body)); var message = {}; //etc. queueClient.Send(message);
© Geeks with Blogs or respective owner