The issuing of a Current Time Increment, Cti, in StreamInsight is very definitely one of the most important concepts to learn if you want your Streams to be responsive. A full discussion of how to issue Ctis is beyond the scope of this article but a very good explanation in addition to Books Online can be found in these three articles by a member of the StreamInsight team at Microsoft, Ciprian Gerea. Time in StreamInsight Series http://blogs.msdn.com/b/streaminsight/archive/2010/07/23/time-in-streaminsight-i.aspx http://blogs.msdn.com/b/streaminsight/archive/2010/07/30/time-in-streaminsight-ii.aspx http://blogs.msdn.com/b/streaminsight/archive/2010/08/03/time-in-streaminsight-iii.aspx A lot of the problems I see with unresponsive or stuck streams on the MSDN Forums are to do with how Ctis are enqueued or in a lot of cases not enqueued. If you enqueue events and never enqueue a Cti then StreamInsight will be perfectly happy. You, on the other hand, will never see data on the output as you have not told StreamInsight to flush the stream. This article deals with a specific implementation problem I had recently whilst working on a StreamInsight project. I look at some possible options and discuss why they would not work before showing the way I solved the problem. The stream of data I was dealing with on this project was very bursty that is to say when events were flowing they came through very quickly and in large numbers (1000 events/sec), but when the stream calmed down it could be a few seconds between each event. When enqueuing events into the StreamInsight engne it is best practice to do so with a StartTime that is given to you by the system producing the event . StreamInsight processes events and it doesn't matter whether those events are being pushed into the engine by a source system or the events are being read from something like a flat file in a directory somewhere. You can apply the same logic and temporal algebra to both situations. Reading from a file is an excellent example of where the time of the event on the source itself is very important. We could be reading that file a long time after it was written. Being able to read the StartTime from the events allows us to define windows that will hold the correct sets of events. I was able to do this with my stream but this is where my problems started. Below is a very simple script to create a SQL Server table and populate it with sample data that will show exactly the problem I had. CREATE TABLE [dbo].[t]
(
[c1] [int] PRIMARY KEY,
[c2] [datetime] NULL
)
INSERT t VALUES (1,'20100810'),(2,'20100810'),(3,'20100810')
Column c2 defines the StartTime of the event on the source and as you can see the values in all 3 rows of data is the same.
If we read Ciprian’s articles we know that we can define how Ctis get injected into the stream in 3 different places
The Stream Definition
The Input Factory
The Input Adapter
I personally have always been a fan of enqueing Ctis through the factory. Below is code typical of what I would use to do this
On the class itself I do some inheriting
public class SimpleInputFactory : ITypedInputAdapterFactory<SimpleInputConfig>, ITypedDeclareAdvanceTimeProperties<SimpleInputConfig>
And then I implement the following function
public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(SimpleInputConfig configInfo, EventShape eventShape)
{
return new AdapterAdvanceTimeSettings(
new AdvanceTimeGenerationSettings(configInfo.CtiFrequency, TimeSpan.FromTicks(-1)),
AdvanceTimePolicy.Adjust);
}
The configInfo .CtiFrequency property is a value I pass through to define after how many events I want a Cti to be injected and this in turn will flush through the stream of data. I usually pass a value of 1 for this setting. The second parameter determines the CTI timestamp in terms of a delay relative to the events. -1 ticks in the past results in 1 tick in the future, i.e., ahead of the event. The problem with this method though is that if consecutive events have the same StartTime then only one of those events will be enqueued. In this example I use the following to define how I assign the StartTime of my events
currEvent.StartTime = (DateTimeOffset)dt.c2;
If I go ahead and run my StreamInsight process with this configuration i can see on the output adapter that two events have been removed
To see this in a little more depth I can use the StreamInsight Debugger and see what happens internally.
What is happening here is that the first event arrives and a Cti is injected with a time of 1 tick after the StartTime of that event (Also the EndTime of the event). The second event arrives and it has a StartTime of before the Cti and even though we specified AdvanceTimePolicy.Adjust on the factory we know that a point event can never be adjusted like this and the event is dropped. The same happens for the third event as well (The second and third events get trumped by the Cti). For a more detailed discussion of why this happens look here
http://www.sqlis.com/sqlis/post/AdvanceTimePolicy-and-Point-Event-Streams-In-StreamInsight.aspx
We end up with a single event being pushed into the output adapter and our result now makes sense.
The next way I tried to solve this problem by changing the value of the second parameter to
TimeSpan.Zero
Here is how my factory code now looks
public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(SimpleInputConfig configInfo, EventShape eventShape)
{
return new AdapterAdvanceTimeSettings(
new AdvanceTimeGenerationSettings(configInfo.CtiFrequency, TimeSpan.Zero),
AdvanceTimePolicy.Adjust);
}
What I am doing here is declaring a policy that says inject a Cti together with every event and stamp it with a StartTime that is equal to the start time of the event itself (TimeSpan.Zero). This method has plus points as well as a downside. The upside is that no events will be lost by having the same StartTime as previous events. The Downside is that because the Cti is declared with the StartTime of the event itself then it does not actually flush that particular event because in the StreamInsight algebra, a Cti commits only those events that occurred strictly before them. To flush the events we need a Cti to be enqueued with a greater StartTime than the events themselves. Here is what happened when I ran this configuration
As you can see all we got through was the Cti and none of the events. The debugger output shows the stamps on the Cti and the events themselves. Because the Cti issued has the same timestamp (StartTime) as the events then none of the events get flushed.
I was nearly there but not quite. Because my stream was bursty it was possible that the next event would not come along for a few seconds and this was far too long for an event to be enqueued and not be flushed to the output adapter. I needed another solution. Two possible solutions crossed my mind although only one of them made sense when I explored it some more.
Where multiple events have the same StartTime I could add 1 tick to the first event, two to the second, three to third etc thereby giving them unique StartTime values.
Add a timer to manually inject Ctis
The problem with the first implementation is that I would be giving the events a new StartTime. This would cause me the following problems
If I want to define windows over the stream then some events may not be captured in the right windows and therefore any calculations on those windows I did would be wrong
What would happen if we had 10,000 events with the same StartTime? I would enqueue them with StartTime + n ticks. Along comes a genuine event with a StartTime of the very first event + 1 tick. It is now too far in the past as far as my stream is concerned and it would be dropped. Not what I would want to do at all.
I decided then to look at the Timer based solution
I created a timer on my input adapter that elapsed every 200ms.
private Timer tmr;
public SimpleInputAdapter(SimpleInputConfig configInfo)
{
ctx = new SimpleTimeExtractDataContext(configInfo.ConnectionString);
this.configInfo = configInfo;
tmr = new Timer(200);
tmr.Elapsed += new ElapsedEventHandler(t_Elapsed);
tmr.Enabled = true;
}
void t_Elapsed(object sender, ElapsedEventArgs e)
{
ts = DateTime.Now - dtCtiIssued;
if (ts.TotalMilliseconds >= 200 && TimerIssuedCti == false)
{
EnqueueCtiEvent(System.DateTime.Now.AddTicks(-100));
TimerIssuedCti = true;
}
}
In the t_Elapsed event handler I find out the difference in time between now and when the last event was processed (dtCtiIssued). I then check to see if that is greater than or equal to 200ms and if the last issuing of a Cti was done by the timer or by a genuine event (TimerIssuedCti). If I didn’t do this check then I would enqueue a Cti every time the timer elapsed which is not something I wanted. If the difference between the two times is greater than or equal to 500ms and the last event enqueued was by a real event then I issue a Cti through the timer to flush the event Queue, otherwise I do nothing.
When I enqueue the Ctis into my stream in my ProduceEvents method I also set the values of dtCtiIssued and TimerIssuedCti
currEvent = CreateInsertEvent();
currEvent.StartTime = (DateTimeOffset)dt.c2;
TimerIssuedCti = false;
dtCtiIssued = currEvent.StartTime;
If I go ahead and run this configuration I see the following in my output.
As we can see the first Cti gets enqueued as before but then another is enqueued by the timer and because this has a later timestamp it flushes the enqueued events through the engine.
Conclusion
Hopefully this has shown how the enqueuing of Ctis can have a dramatic effect on the responsiveness of your output in StreamInsight. Understanding the temporal nature of the product is for me one of the most important things you can learn. I have attached my solution for the demos. It is all in one project and testing each variation is a simple matter of commenting and un-commenting the parts in the code we have been dealing with here.