Windows Azure AppFabric: ServiceBus Queue WPF Sample
- by xamlnotes
The latest version of the AppFabric ServiceBus now has support for queues and topics. Today I will show you a bit about using queues and also talk about some of the best practices in using them. If you are just getting started, you can check out this site for more info on Windows Azure. One of the 1st things I thought if when Azure was announced back when was how we handle fault tolerance. Web sites hosted in Azure are no much of an issue unless they are using SQL Azure and then you must account for potential fault or latency issues. Today I want to talk a bit about ServiceBus and how to handle fault tolerance. And theres stuff like connecting to the servicebus and so on you have to take care of. To demonstrate some of the things you can do, let me walk through this sample WPF app that I am posting for you to download. To start off, the application is going to need things like the servicenamespace, issuer details and so forth to make everything work. To facilitate this I created settings in the wpf app for all of these items. Then I mapped a static class to them and set the values when the program loads like so: StaticElements.ServiceNamespace = Convert.ToString(Properties.Settings.Default["ServiceNamespace"]); StaticElements.IssuerName = Convert.ToString(Properties.Settings.Default["IssuerName"]); StaticElements.IssuerKey = Convert.ToString(Properties.Settings.Default["IssuerKey"]); StaticElements.QueueName = Convert.ToString(Properties.Settings.Default["QueueName"]); Now I can get to each of these elements plus some other common values or instances directly from the StaticElements class. Now, lets look at the application. The application looks like this when it starts: The blue graphic represents the queue we are going to use. The next figure shows the form after items were added and the queue stats were updated . You can see how the queue has grown: To add an item to the queue, click the Add Order button which displays the following dialog: After you fill in the form and press OK, the order is published to the ServiceBus queue and the form closes. The application also allows you to read the queued items by clicking the Process Orders button. As you can see below, the form shows the queued items in a list and the queue has disappeared as its now empty. In real practice we normally would use a Windows Service or some other automated process to subscribe to the queue and pull items from it. I created a class named ServiceBusQueueHelper that has the core queue features we need. There are three public methods: * GetOrCreateQueue – Gets an instance of the queue description if the queue exists. if not, it creates the queue and returns a description instance. * SendMessageToQueue = This method takes an order instance and sends it to the queue. The call to the queue is wrapped in the ExecuteAction method from the Transient Fault Tolerance Framework and handles all the retry logic for the queue send process. * GetOrderFromQueue – Grabs an order from the queue and returns a typed order from the queue. It also marks the message complete so the queue can remove it. Now lets turn to the WPF window code (MainWindow.xaml.cs). The constructor contains the 4 lines shown about to setup the static variables and to perform other initialization tasks. The next few lines setup certain features we need for the ServiceBus: TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(StaticElements.IssuerName, StaticElements.IssuerKey); Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", StaticElements.ServiceNamespace, string.Empty); StaticElements.CurrentNamespaceManager = new NamespaceManager(serviceUri, credentials); StaticElements.CurrentMessagingFactory = MessagingFactory.Create(serviceUri, credentials); The next two lines update the queue name label and also set the timer to 20 seconds. QueueNameLabel.Content = StaticElements.QueueName; _timer.Interval = TimeSpan.FromSeconds(20); Next I call the UpdateQueueStats to initialize the UI for the queue: UpdateQueueStats(); _timer.Tick += new EventHandler(delegate(object s, EventArgs a) { UpdateQueueStats(); }); _timer.Start(); } The UpdateQueueStats method shown below. You can see that it uses the GetOrCreateQueue method mentioned earlier to grab the queue description, then it can get the MessageCount property. private void UpdateQueueStats() { _queueDescription = _serviceBusQueueHelper.GetOrCreateQueue(); QueueCountLabel.Content = "(" + _queueDescription.MessageCount + ")"; long count = _queueDescription.MessageCount; long queueWidth = count * 20; QueueRectangle.Width = queueWidth; QueueTickCount += 1; TickCountlabel.Content = QueueTickCount.ToString(); } The ReadQueueItemsButton_Click event handler calls the GetOrderFromQueue method and adds the order to the listbox. If you look at the SendQueueMessageController, you can see the SendMessage method that sends an order to the queue. Its pretty simple as it just creates a new CustomerOrderEntity instance,fills it and then passes it to the SendMessageToQueue. As you can see, all of our interaction with the queue is done through the helper class (ServiceBusQueueHelper). Now lets dig into the helper class. First, before you create anything like this, download the Transient Fault Handling Framework. Microsoft provides this free and they also provide the C# source. Theres a great article that shows how to use this framework with ServiceBus. I included the entire ServiceBusQueueHelper class in List 1. Notice the using statements for TransientFaultHandling: using Microsoft.AzureCAT.Samples.TransientFaultHandling; using Microsoft.AzureCAT.Samples.TransientFaultHandling.ServiceBus; The SendMessageToQueue in Listing 1 shows how to use the async send features of ServiceBus with them wrapped in the Transient Fault Handling Framework. It is not much different than plain old ServiceBus calls but it sure makes it easy to have the fault tolerance added almost for free. The GetOrderFromQueue uses the standard synchronous methods to access the queue. The best practices article walks through using the async approach for a receive operation also. Notice that this method makes a call to Receive to get the message then makes a call to GetBody to get a new strongly typed instance of CustomerOrderEntity to return. Listing 1 using System; using System.Collections.Generic; using System.Linq; using System.Text; using Microsoft.AzureCAT.Samples.TransientFaultHandling; using Microsoft.AzureCAT.Samples.TransientFaultHandling.ServiceBus; using Microsoft.ServiceBus; using Microsoft.ServiceBus.Messaging; using System.Xml.Serialization; using System.Diagnostics; namespace WPFServicebusPublishSubscribeSample { class ServiceBusQueueHelper { RetryPolicy currentPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount); QueueClient currentQueueClient; public QueueDescription GetOrCreateQueue() { QueueDescription queue = null; bool createNew = false; try { // First, let's see if a queue with the specified name already exists. queue = currentPolicy.ExecuteAction<QueueDescription>(() => { return StaticElements.CurrentNamespaceManager.GetQueue(StaticElements.QueueName); }); createNew = (queue == null); } catch (MessagingEntityNotFoundException) { // Looks like the queue does not exist. We should create a new one. createNew = true; } // If a queue with the specified name doesn't exist, it will be auto-created. if (createNew) { try { var newqueue = new QueueDescription(StaticElements.QueueName); queue = currentPolicy.ExecuteAction<QueueDescription>(() => { return StaticElements.CurrentNamespaceManager.CreateQueue(newqueue); }); } catch (MessagingEntityAlreadyExistsException) { // A queue under the same name was already created by someone else, // perhaps by another instance. Let's just use it. queue = currentPolicy.ExecuteAction<QueueDescription>(() => { return StaticElements.CurrentNamespaceManager.GetQueue(StaticElements.QueueName); }); } } currentQueueClient = StaticElements.CurrentMessagingFactory.CreateQueueClient(StaticElements.QueueName); return queue; } public void SendMessageToQueue(CustomerOrderEntity Order) { BrokeredMessage msg = null; GetOrCreateQueue(); // Use a retry policy to execute the Send action in an asynchronous and reliable fashion. currentPolicy.ExecuteAction ( (cb) => { // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning. msg = new BrokeredMessage(Order); // Send the event asynchronously. currentQueueClient.BeginSend(msg, cb, null); }, (ar) => { try { // Complete the asynchronous operation. // This may throw an exception that will be handled internally by the retry policy. currentQueueClient.EndSend(ar); } finally { // Ensure that any resources allocated by a BrokeredMessage instance are released. if (msg != null) { msg.Dispose(); msg = null; } } }, (ex) => { // Always dispose the BrokeredMessage instance even if the send // operation has completed unsuccessfully. if (msg != null) { msg.Dispose(); msg = null; } // Always log exceptions. Trace.TraceError(ex.Message); } ); } public CustomerOrderEntity GetOrderFromQueue() { CustomerOrderEntity Order = new CustomerOrderEntity(); QueueClient myQueueClient = StaticElements.CurrentMessagingFactory.CreateQueueClient(StaticElements.QueueName, ReceiveMode.PeekLock); BrokeredMessage message; ServiceBusQueueHelper serviceBusQueueHelper = new ServiceBusQueueHelper(); QueueDescription queueDescription; queueDescription = serviceBusQueueHelper.GetOrCreateQueue(); if (queueDescription.MessageCount > 0) { message = myQueueClient.Receive(TimeSpan.FromSeconds(90)); if (message != null) { try { Order = message.GetBody<CustomerOrderEntity>(); message.Complete(); } catch (Exception ex) { throw ex; } } else { throw new Exception("Did not receive the messages"); } } return Order; } } } I will post a link to the download demo in a separate post soon.