Changes to the LINQ-to-StreamInsight Dialect
- by Roman Schindlauer
In previous versions of StreamInsight (1.0 through 2.0), CepStream<> represents temporal streams of many varieties:
Streams with ‘open’ inputs (e.g., those defined and composed over CepStream<T>.Create(string streamName)
Streams with ‘partially bound’ inputs (e.g., those defined and composed over CepStream<T>.Create(Type adapterFactory, …))
Streams with fully bound inputs (e.g., those defined and composed over To*Stream – sequences or DQC)
The stream may be embedded (where Server.Create is used)
The stream may be remote (where Server.Connect is used)
When adding support for new programming primitives in StreamInsight 2.1, we faced a choice: Add a fourth variety (use CepStream<> to represent streams that are bound the new programming model constructs), or introduce a separate type that represents temporal streams in the new user model. We opted for the latter. Introducing a new type has the effect of reducing the number of (confusing) runtime failures due to inappropriate uses of CepStream<> instances in the incorrect context. The new types are:
IStreamable<>, which logically represents a temporal stream.
IQStreamable<> : IStreamable<>, which represents a queryable temporal stream. Its relationship to IStreamable<> is analogous to the relationship of IQueryable<> to IEnumerable<>. The developer can compose temporal queries over remote stream sources using this type.
The syntax of temporal queries composed over IQStreamable<> is mostly consistent with the syntax of our existing CepStream<>-based LINQ provider. However, we have taken the opportunity to refine certain aspects of the language surface. Differences are outlined below. Because 2.1 introduces new types to represent temporal queries, the changes outlined in this post do no impact existing StreamInsight applications using the existing types!
SelectMany
StreamInsight does not support the SelectMany operator in its usual form (which is analogous to SQL’s “CROSS APPLY” operator):
static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> collectionSelector)
It instead uses SelectMany as a convenient syntactic representation of an inner join. The parameter to the selector function is thus unavailable. Because the parameter isn’t supported, its type in StreamInsight 1.0 – 2.0 wasn’t carefully scrutinized. Unfortunately, the type chosen for the parameter is nonsensical to LINQ programmers:
static CepStream<R> SelectMany<T, R>(this CepStream<T> source, Expression<Func<CepStream<T>, CepStream<R>>> streamSelector)
Using Unit as the type for the parameter accurately reflects the StreamInsight’s capabilities:
static IQStreamable<R> SelectMany<T, R>(this IQStreamable<T> source, Expression<Func<Unit, IQStreamable<R>>> streamSelector)
For queries that succeed – that is, queries that do not reference the stream selector parameter – there is no difference between the code written for the two overloads:
from x in xs from y in ys select f(x, y)
Top-K
The Take operator used in StreamInsight causes confusion for LINQ programmers because it is applied to the (unbounded) stream rather than the (bounded) window, suggesting that the query as a whole will return k rows:
(from win in xs.SnapshotWindow() from x in win orderby x.A select x.B).Take(k)
The use of SelectMany is also unfortunate in this context because it implies the availability of the window parameter within the remainder of the comprehension. The following compiles but fails at runtime:
(from win in xs.SnapshotWindow() from x in win orderby x.A select win).Take(k)
The Take operator in 2.1 is applied to the window rather than the stream:
Before
After
(from win in xs.SnapshotWindow() from x in win orderby x.A select x.B).Take(k)
from win in xs.SnapshotWindow() from b in (from x in win orderby x.A select x.B).Take(k) select b
Multicast
We are introducing an explicit multicast operator in order to preserve expression identity, which is important given the semantics about moving code to and from StreamInsight. This also better matches existing LINQ dialects, such as Reactive. This pattern enables expressing multicasting in two ways:
Implicit
Explicit
var ys = from x in xs where x.A > 1 select x; var zs = from y1 in ys from y2 in ys.ShiftEventTime(_ => TimeSpan.FromSeconds(1)) select y1 + y2;
var ys = from x in xs where x.A > 1 select x; var zs = ys.Multicast(ys1 => from y1 in ys1 from y2 in ys1.ShiftEventTime(_ => TimeSpan.FromSeconds(1)) select y1 + y2;
Notice the product translates an expression using implicit multicast into an expression using the explicit multicast operator. The user does not see this translation.
Default window policies
Only default window policies are supported in the new surface. Other policies can be simulated by using AlterEventLifetime.
Before
After
xs.SnapshotWindow( WindowInputPolicy.ClipToWindow, SnapshotWindowInputPolicy.Clip)
xs.SnapshotWindow()
xs.TumblingWindow( TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.PointAlignToWindowEnd)
xs.TumblingWindow( TimeSpan.FromSeconds(1))
xs.TumblingWindow( TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
Not supported
…
LeftAntiJoin
Representation of LASJ as a correlated sub-query in the LINQ surface is problematic as the StreamInsight engine does not support correlated sub-queries (see discussion of SelectMany). The current syntax requires the introduction of an otherwise unsupported ‘IsEmpty()’ operator. As a result, the pattern is not discoverable and implies capabilities not present in the server. The direct representation of LASJ is used instead:
Before
After
from x in xs where (from y in ys where x.A > y.B select y).IsEmpty() select x
xs.LeftAntiJoin(ys, (x, y) => x.A > y.B)
from x in xs where (from y in ys where x.A == y.B select y).IsEmpty() select x
xs.LeftAntiJoin(ys, x => x.A, y => y.B)
ApplyWithUnion
The ApplyWithUnion methods have been deprecated since their signatures are redundant given the standard SelectMany overloads:
Before
After
xs.GroupBy(x => x.A).ApplyWithUnion(gs => from win in gs.SnapshotWindow() select win.Count())
xs.GroupBy(x => x.A).SelectMany( gs => from win in gs.SnapshotWindow() select win.Count())
xs.GroupBy(x => x.A).ApplyWithUnion(gs => from win in gs.SnapshotWindow() select win.Count(), r => new { r.Key, Count = r.Payload })
from x in xs group x by x.A into gs from win in gs.SnapshotWindow() select new { gs.Key, Count = win.Count() }
Alternate UDO syntax
The representation of UDOs in the StreamInsight LINQ dialect confuses cardinalities. Based on the semantics of user-defined operators in StreamInsight, one would expect to construct queries in the following form:
from win in xs.SnapshotWindow() from y in MyUdo(win) select y
Instead, the UDO proxy method is referenced within a projection, and the (many) results returned by the user code are automatically flattened into a stream:
from win in xs.SnapshotWindow() select MyUdo(win)
The “many-or-one” confusion is exemplified by the following example that compiles but fails at runtime:
from win in xs.SnapshotWindow() select MyUdo(win) + win.Count()
The above query must fail because the UDO is in fact returning many values per window while the count aggregate is returning one.
Original syntax
New alternate syntax
from win in xs.SnapshotWindow() select win.UdoProxy(1)
from win in xs.SnapshotWindow() from y in win.UserDefinedOperator(() => new Udo(1)) select y
-or-
from win in xs.SnapshotWindow() from y in win.UdoMacro(1) select y
Notice that this formulation also sidesteps the dynamic type pitfalls of the existing “proxy method” approach to UDOs, in which the type of the UDO implementation (TInput, TOuput) and the type of its constructor arguments (TConfig) need to align in a precise and non-obvious way with the argument and return types for the corresponding proxy method.
UDSO syntax
UDSO currently leverages the DataContractSerializer to clone initial state for logical instances of the user operator. Initial state will instead be described by an expression in the new LINQ surface.
Before
After
xs.Scan(new Udso())
xs.Scan(() => new Udso())
Name changes
ShiftEventTime => AlterEventStartTime: The alter event lifetime overload taking a new start time value has been renamed.
CountByStartTimeWindow => CountWindow