How is IObservable<double>.Average supposed to work?

Posted by Dan Tao on Stack Overflow See other posts from Stack Overflow or by Dan Tao
Published on 2010-05-07T19:59:06Z Indexed on 2010/05/07 22:18 UTC
Read the original article Hit count: 379

Filed under:
|
|
|
|

Update

Looks like Jon Skeet was right (big surprise!) and the issue was with my assumption about the Average extension providing a continuous average (it doesn't).

For the behavior I'm after, I wrote a simple ContinuousAverage extension method, the implementation of which I am including here for the benefit of others who may want something similar:

public static class ObservableExtensions {
    private class ContinuousAverager {
            private double _mean;
            private long _count;

        public ContinuousAverager() {
            _mean = 0.0;
            _count = 0L;
        }

        // undecided whether this method needs to be made thread-safe or not
        // seems that ought to be the responsibility of the IObservable (?)
        public double Add(double value) {
            double delta = value - _mean;
            _mean += (delta / (double)(++_count));
            return _mean;
        }
    }

    public static IObservable<double> ContinousAverage(this IObservable<double> source) {
        var averager = new ContinuousAverager();

        return source.Select(x => averager.Add(x));
    }
}

I'm thinking of going ahead and doing something like the above for the other obvious candidates as well -- so, ContinuousCount, ContinuousSum, ContinuousMin, ContinuousMax ... perhaps ContinuousVariance and ContinuousStandardDeviation as well? Any thoughts on that?


Original Question

I use Rx Extensions a little bit here and there, and feel I've got the basic ideas down.

Now here's something odd: I was under the impression that if I wrote this:

var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

var bids = ticks
    .Where(e => e.EventArgs.Quote.HasBid)
    .Select(e => e.EventArgs.Quote.Bid);

var bidsSubscription = bids.Subscribe(
    b => Console.WriteLine("Bid: {0}", b)
);

var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

I would get two IObservable<double> objects (bids and avgOfBids); one would basically be a stream of all the market bids from my MarketDataProvider, the other would be a stream of the average of these bids.

So something like this:

Bid    Avg Bid
1      1
2      1.5
1      1.33
2      1.5

It seems that my avgOfBids object isn't doing anything. What am I missing? I think I've probably misunderstood what Average is actually supposed to do. (This also seems to be the case for all of the aggregate-like extension methods on IObservable<T> -- e.g., Max, Count, etc.)

© Stack Overflow or respective owner

Related posts about .NET

Related posts about rx