In his blogpost Roman from the StreamInsight team asked if we could create a Reactive Framework version of what he had done in the post using StreamInsight. For those who don’t know, the Reactive Framework or Rx to its friends is a library for composing asynchronous and event-based programs using observable collections in the .Net framework. Yes, there is some overlap between StreamInsight and the Reactive Extensions but StreamInsight has more flexibility and power in its temporal algebra (Windowing, Alteration of event headers) Well here are two alternate ways of doing what Roman did. The first example is a mix of StreamInsight and Rx var rnd = new Random();
var RandomValue = 0;
var interval = Observable.Interval(TimeSpan.FromMilliseconds((Int32)rnd.Next(500,3000)))
.Select(i =>
{
RandomValue = rnd.Next(300);
return RandomValue;
});
Server s = Server.Create("Default");
Microsoft.ComplexEventProcessing.Application a = s.CreateApplication("Rx SI Mischung");
var inputStream = interval.ToPointStream(a, evt =>
PointEvent.CreateInsert(
System.DateTime.Now.ToLocalTime(),
new { RandomValue = evt}), AdvanceTimeSettings.IncreasingStartTime, "Rx Sample");
var r = from evt in inputStream
select new { runningVal = evt.RandomValue };
foreach (var x in r.ToPointEnumerable().Where(e => e.EventKind != EventKind.Cti))
{
Console.WriteLine(x.Payload.ToString());
}
This next version though uses the Reactive Extensions Only
var rnd = new Random();
var RandomValue = 0;
Observable.Interval(TimeSpan.FromMilliseconds((Int32)rnd.Next(500, 3000)))
.Select(i =>
{
RandomValue = rnd.Next(300);
return RandomValue;
}).Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
Console.ReadKey();
These are very simple examples but both technologies allow us to do a lot more. The ICEPObservable() design pattern was reintroduced in StreamInsight 1.1 and the more I use it the more I like it. It is a very useful pattern when wanting to show StreamInsight samples as is the IEnumerable() pattern.