RX Framework Performance Awareness

In a post a while ago, here, I implemented a DifferentiateWithTime operator, which was implemented in terms of other RX operators, and at the end I closed out by saying “It does say in section 6.1 of the RX Design Guidelines that new operators should be composed of existing operators, unless performance is a concern – this is something I may get around to investigating in a future blog post.” and thought I’d finally get around to looking at this.

As a recap, the differentiate operator was written using a sliding window operator (updated to use RX 1.1 experimental):

        public static IObservable<double> DifferentiateWithTime
(this IObservable<double> obs)
{
return (from j in obs.SlidingWindow(2)
select j[1] - j[0]);
}

        // from http://social.msdn.microsoft.com/Forums/en-US/rx/thread/37428f58-f241-45b3-a878-c1627deb9ac4#bcdc7b79-bbde-4145-88e4-583685285682
public static IObservable<IList<TSource>>
SlidingWindow<TSource>(this IObservable<TSource> source,
int count)
{
Contract.Requires(source != null);
Contract.Requires(count >= 0);

return source.Publish(published =>
from x in published
from buffer in published.StartWith(x).Buffer(count).Take(1)
where buffer.Count == count
select buffer
);
}


I created a simple test fixture to exercise this.



            const double accelerationGravity = 9.81;
var count = 10000.0;
var positions = Observable.Generate(0.0,
i => i < count,
i => i + 1.0,
i => accelerationGravity * i * i / 2.0);

var stopwatch = new Stopwatch();
stopwatch.Start();

var velocity = positions.DifferentiateWithTime();

var sumVelocity = 0.0;
using (velocity.Subscribe(i =>
{ sumVelocity += i; })) { };

var acceleration = velocity.DifferentiateWithTime();

var sumAcceleration = 0.0;
using (acceleration.Subscribe(i =>
{ sumAcceleration += i; })) { };

stopwatch.Stop();

Console.WriteLine("sumVel:{0} sumAcc:{1} time:{2}",
sumVelocity,
sumAcceleration,
stopwatch.ElapsedMilliseconds);



Monitoring in Perfmon, I saw that during the 10000 iterations there were 170 Generation 0 garbage collections, which does seem quite overkill. The total time was 6580 milliseconds.



I then implemented the DifferentiateWithTime operator directly:



        public static IObservable<double> DifferentiateWithTime
(this IObservable<double> obs)
{
return Observable.Create<double>(o =>
{
double previousValue = 0.0;
var initialized = false;

return obs.Subscribe(i =>
{
if (initialized)
{
o.OnNext(i - previousValue);
}
else
{
initialized = true;
}
previousValue = i;
});
});
}



This time there were only 4 Generation 0 garbage collections during the test run, and even more excitingly, the execution time was 379 milliseconds. A 17 time speedup is quite impressive, and shows that it’s worth being careful!



It’s likely that other operations based on sliding windows (rolling averages, VWAP etc) may have similar issues, and may benefit from similar observations. Instead of forcing the user to manually make these changes, it may be possible to do this in a more automated way (I’ve got some ideas I’ll play with when I get time.)


Labels: ,