AWS .NET SDK v2: the message-pump pattern
- by Elton Stoneman
Originally posted on: http://geekswithblogs.net/EltonStoneman/archive/2013/10/11/aws-.net-sdk-v2--the-message-pump-pattern.aspxVersion 2 of the AWS SDK for .NET has had a few pre-release iterations on NuGet and is stable, if a bit lacking in step-by-step guides. There’s at least one big reason to try it out: the SQS queue client now supports asynchronous reads, so you don’t need a clumsy polling mechanism to retrieve messages.
The new approach is easy to use, and lets you work with AWS queues in a similar way to the message-pump pattern used in the latest Azure SDK for Service Bus queues and topics.
I’ve posted a simple wrapper class for subscribing to an SQS hub on gist here: A wrapper for the SQS client in the AWS SDK for.NET v2, which uses the message-pump pattern.
Here’s the core functionality in the subscribe method:
private async void Subscribe()
{
if (_isListening)
{
var request = new ReceiveMessageRequest { MaxNumberOfMessages = 10 };
request.QueueUrl = QueueUrl;
var result = await _sqsClient.ReceiveMessageAsync(request, _cancellationTokenSource.Token);
if (result.Messages.Count > 0)
{
foreach (var message in result.Messages)
{
if (_receiveAction != null && message != null)
{
_receiveAction(message.Body);
DeleteMessage(message.ReceiptHandle);
}
}
}
}
if (_isListening)
{
Subscribe();
}
}
which you call with something like this:
client.Subscribe(x=>Log.Debug(x.Body));
The async SDK call returns when there is something in the queue, and will run your receive action for every message it gets in the batch (defaults to the maximum size of 10 messages per call).
The listener will sit there awaiting messages until you stop it with:
client.Unsubscribe();
Internally it has a cancellation token which it sets when you call unsubscribe, which cancels any in-flight call to SQS and stops the pump.
The wrapper will also create the queue if it doesn’t exist at runtime. The Ensure() method gets called in the constructor so when you first use the client for a queue (sending or subscribing), it will set itself up:
if (!Exists())
{
var request = new CreateQueueRequest();
request.QueueName = QueueName;
var response = _sqsClient.CreateQueue(request);
QueueUrl = response.QueueUrl;
}
The Exists() check has to do make a call to ListQueues on the SNS client, as it doesn’t provide its own method to check if a queue exists. That call also populates the Amazon Resource Name, the unique identifier for this queue, which will be useful later.
To use the wrapper, just instantiate and go:
var queueClient = new QueueClient(“ProcessWorkflow”);
queueClient.Subscribe(x=>Log.Debug(x.Body));
var message = {}; //etc.
queueClient.Send(message);