I had an hour spare this afternoon so I wanted to have another play with Reactive Extensions in .Net and StreamInsight. I also didn’t want to simply use a console window as a way of gathering events so I decided to use a windows form instead. The task I set myself was this. Whenever I click on my form I want to subscribe to the event and output its location to the console window and also the timestamp of the event. In addition to this I want to know for every mouse click I do, how many mouse clicks have happened in the last 5 seconds. The second point here is really interesting. I have often found this when working with people on problems. It is how you ask the question that determines how you tackle the problem. I will show 2 ways of possibly answering the second question depending on how the question was interpreted. As a side effect of this example I will show how time in StreamInsight can stand still. This is an important concept and we can see it in the output later. Now to the code. I will break it all down in this blogpost but you can download the solution and see it all together. I created a Console application and then instantiate a windows form. frm = new Form();
Thread g = new Thread(CallUI);
g.SetApartmentState(ApartmentState.STA);
g.Start();
Call UI looks like this
static void CallUI()
{
System.Windows.Forms.Application.Run(frm);
frm.Activate();
frm.BringToFront();
}
Now what we need to do is create an observable from the MouseClick event on the form. For this we use the Reactive Extensions.
var lblevt = Observable.FromEvent<MouseEventArgs>(frm, "MouseClick").Timestamp();
As mentioned earlier I have two objectives in this example and to solve the first I am going to again use the Reactive extensions. Let’s subscribe to the MouseClick event and output the location and timestamp to the console.
lblevt.Subscribe(evt =>
{
Console.WriteLine("Clicked: {0}, {1} ", evt.Value.EventArgs.Location,evt.Timestamp);
});
That should take care of obective #1 but what about the second objective. For that we need some temporal windowing and this means StreamInsight. First we need to turn our Observable collection of MouseClick events into a PointStream
Server s = Server.Create("Default");
Microsoft.ComplexEventProcessing.Application a = s.CreateApplication("MouseClicks");
var input = lblevt.ToPointStream(
a,
evt => PointEvent.CreateInsert(
evt.Timestamp,
new {
loc = evt.Value.EventArgs.Location.ToString(),
ts = evt.Timestamp.ToLocalTime().ToString()
}),
AdvanceTimeSettings.IncreasingStartTime);
Now that we have created out PointStream we need to do something with it and this is where we get to our second objective. It is pretty clear that we want some kind of windowing but what?
Here is one way of doing it. It might not be what you wanted but again it is how the second objective is interpreted
var q = from i in input.TumblingWindow(TimeSpan.FromSeconds(5), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new { CountOfClicks = i.Count() };
The above code creates tumbling windows of 5 seconds and counts the number of events in the windows. If there are no events in the window then no result is output. Likewise until an event (MouseClick) is issued then we do not see anything in the output (that is not strictly true because it is the CTI strapped to our MouseClick events that flush the events through the StreamInsight engine not the events themselves). This approach is centred around the windows and not the events. Until the windows complete and a CTI is issued then no events are pushed through.
An alternate way of answering our second question is below
var q = from i in input.AlterEventDuration(evt => TimeSpan.FromSeconds(5)).SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { CountOfClicks = i.Count() };
In this code we extend the duration of each MouseClick to five seconds. We then create Snapshot Windows over those events. Snapshot windows are discussed in detail here. With this solution we are centred around the events. It is the events that are driving the output. Let’s have a look at the output from this solution as it may be a little confusing.
First though let me show how we get the output from StreamInsight into the Console window.
foreach (var x in q.ToPointEnumerable().Where(e => e.EventKind != EventKind.Cti))
{
Console.WriteLine(x.Payload.CountOfClicks);
}
Ok so now to the output.
The table at the top shows the output from our routine and the table at the bottom helps to explain the output. One of the things that will help as well is, you will note that for our PointStream we set the issuing of CTIs to be IncreasingStartTime. What this means is that the CTI is placed right at the start of the event so will not flush the event with which it was issued but will flush those prior to it.
In the bottom table the Blue fill is where we issued a click. Yellow fill is the duration and boundaries of our events. The numbers at the bottom indicate the count of events
Clicked
22:40:16
Clicked
23:40:18
1
Clicked
23:40:20
2
Clicked
23:40:22
3
2
Clicked
23:40:24
3
2
Clicked
23:40:32
3
2
1
secs
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
counts
1
2
3
2
3
2
3
2
1
What we can see here in the output is that the counts include all the end edges that have occurred between the mouse clicks.
If we look specifically at the mouse click at 22:40:32. then we see that 3 events are returned to us. These include the following
End Edge count at 22:40:25
End Edge count at 22:40:27
End Edge count at 22:40:29
Another thing we notice is that until we actually issue a CTI at 22:40:32 then those last 3 snapshot window counts will never be reported.
Hopefully this has helped to explain a few concepts around StreamInsight and the IObservable() pattern.
You can download this solution from here and play. You will need the Reactive Framework from here and StreamInsight 1.1