Rx IObservable buffering to smooth out bursts of events
- by Dan
I have an Observable sequence that produces events in rapid bursts (ie: five events one right after another, then a long delay, then another quick burst of events, etc.). I want to smooth out these bursts by inserting a short delay between events. Imagine the following diagram as an example:
Raw: --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|
My current approach is to generate a metronome-like timer via Observable.Interval() that signals when it's ok to pull another event from the raw stream. The problem is that I can't figure out how to then combine that timer with my raw unbuffered observable sequence.
IObservable.Zip() is close to doing what I want, but it only works so long as the raw stream is producing events faster than the timer. As soon as there is a significant lull in the raw stream, the timer builds up a series of unwanted events that then immediately pair up with the next burst of events from the raw stream.
Ideally, I want an IObservable extension method with the following function signature that produces the bevaior I've outlined above. Now, come to my rescue StackOverflow :)
public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)
PS. I'm brand new to Rx, so my apologies if this is a trivially simple question...
1. Simple yet flawed approach
Here's my initial naive and simplistic solution that has quite a few problems:
public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
Queue<T> q = new Queue<T>();
source.Subscribe(x => q.Enqueue(x));
return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}
The first obvious problem with this is that the IDisposable returned by the inner subscription to the raw source is lost and therefore the subscription can't be terminated. Calling Dispose on the IDisposable returned by this method kills the timer, but not the underlying raw event feed that is now needlessly filling the queue with nobody left to pull events from the queue.
The second problem is that there's no way for exceptions or end-of-stream notifications to be propogated through from the raw event stream to the buffered stream - they are simply ignored when subscribing to the raw source.
And last but not least, now I've got code that wakes up periodically regardless of whether there is actually any work to do, which I'd prefer to avoid in this wonderful new reactive world.
2. Way overly complex appoach
To solve the problems encountered in my initial simplistic approach, I wrote a much more complicated function that behaves much like IObservable.Delay() (I used .NET Reflector to read that code and used it as the basis of my function). Unfortunately, a lot of the boilerplate logic such as AnonymousObservable is not publicly accessible outside the system.reactive code, so I had to copy and paste a lot of code. This solution appears to work, but given its complexity, I'm less confident that its bug free.
I just can't believe that there isn't a way to accomplish this using some combination of the standard Reactive extensions. I hate feeling like I'm needlessly reinventing the wheel, and the pattern I'm trying to build seems like a fairly standard one.