StreamInsight and Reactive Framework Challenge

Posted on SQLIS See other posts from SQLIS
Published on Sun, 06 Feb 2011 23:09:18 +0100 Indexed on 2011/02/07 7:32 UTC
Read the original article Hit count: 598

In his blogpost Roman from the StreamInsight team asked if we could create a Reactive Framework version of what he had done in the post using StreamInsight.  For those who don’t know, the Reactive Framework or Rx to its friends is a library for composing asynchronous and event-based programs using observable collections in the .Net framework.  Yes, there is some overlap between StreamInsight and the Reactive Extensions but StreamInsight has more flexibility and power in its temporal algebra (Windowing, Alteration of event headers)

Well here are two alternate ways of doing what Roman did.

The first example is a mix of StreamInsight and Rx

var rnd = new Random();
var RandomValue = 0;
var interval = Observable.Interval(TimeSpan.FromMilliseconds((Int32)rnd.Next(500,3000)))
    .Select(i =>
    {
        RandomValue = rnd.Next(300);
        return RandomValue;
    });

Server s = Server.Create("Default");
Microsoft.ComplexEventProcessing.Application a = s.CreateApplication("Rx SI Mischung");

var inputStream = interval.ToPointStream(a, evt =>
                            PointEvent.CreateInsert(
                                    System.DateTime.Now.ToLocalTime(), 
                                    new { RandomValue = evt}), AdvanceTimeSettings.IncreasingStartTime, "Rx Sample");


var r = from evt in inputStream
        select new { runningVal = evt.RandomValue };


foreach (var x in r.ToPointEnumerable().Where(e => e.EventKind != EventKind.Cti))
{
    Console.WriteLine(x.Payload.ToString());
}

This next version though uses the Reactive Extensions Only

 

var rnd = new Random();
var RandomValue = 0;
Observable.Interval(TimeSpan.FromMilliseconds((Int32)rnd.Next(500, 3000)))
    .Select(i =>
    {
        RandomValue = rnd.Next(300);
        return RandomValue;
    }).Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));

Console.ReadKey();

 

These are very simple examples but both technologies allow us to do a lot more.  The ICEPObservable() design pattern was reintroduced in StreamInsight 1.1 and the more I use it the more I like it.  It is a very useful pattern when wanting to show StreamInsight samples as is the IEnumerable() pattern.

© SQLIS or respective owner

Related posts about StreamInsight

Related posts about execution