As I said in my previous blog post, in one of my previous roles involved performing real-time analysis of incoming sensor and telemetry data, combining streams of data of different data rates, allowing the user to write their own calculation operations on the data, and displaying it to the user. I wondered what this would have been like to implement using RX.
As an aside, it may be that the new TPL Dataflow library would be more suitable for this – and I am a bit confused about the overlap between some many different technologies, such as RX and the forthcoming C# 5.0 async, or whether TPL Dataflow replaces the Actor model in Microsoft Axum or F#).
Anyway, back with the plot…
Imagine that you have to handle incoming streams of sensor data. This doesn’t have to be hardware sensor data, it could be a phone’s GPS or accelerometer data, or incoming streams of trade data. In the old style of programming there would be a callback or event handler that would be triggered when the data arrived, and to perform aggregate calculations over multiple data streams would involve convoluted code with locks – with RX there’s none of that, just a simple linq query to combine the data:
Imagine that you had two streams of position data (here the observables are generated, but they could be observables from events, the usage is identical, which is beautiful!)
var positions = Observable.Generate(0.0, i => i < 10.0, i => i + 1.0, i => i); var positions2 = Observable.Generate(10.0, i => i >= 0.0, i => i - 1.0, i => i);An operator to calculate the separation is as simple as:
var separation = positions.Zip(positions2, (i, j) => new { i, j }).Select(pos => Math.Abs(pos.j - pos.i));
As well as calculating the difference between two positions, you might want to find the rate of change of some input data:
static void Main(string[] args){
const double accelerationGravity = 9.81; var positions = Observable.Generate(0.0, i => i < 10.0, i => i + 1.0, i => accelerationGravity * i * i / 2.0); var velocity = positions.DifferentiateWithTime(); Console.WriteLine("-- velocities -- ");velocity.Subscribe(i => {
Console.WriteLine(i); }); var acceleration = velocity.DifferentiateWithTime(); Console.WriteLine("-- accelerations -- ");acceleration.Subscribe(i => {
Console.WriteLine(i); });}
}
Again, this is nice. The acceleration is correctly written out for every value as 9.81 (well, within rounding errors).
DifferentiateWithTime is implemented as:
public static partial class ObservableEx2{
public static IObservable<double> DifferentiateWithTime(this IObservable<double> obs){
return (from j in obs.WindowWithCount(2) select j[1] - j[0]);}
..
In this case we’re taking the current and previous value and taking the difference between them – this of course is assuming that we are receiving values once per second. We could include the time of each sample in a Tuple, and use the Where operator to ignore values which occur at the same time period (to avoid divide-by-zero errors).
WindowWithCount I found on StackOverflow. This takes a rolling window of values around an observables value, which can be used for operations such as a moving average.
public static IObservable<IList<TSource>> WindowWithCount<TSource>(this IObservable<TSource> source, int count){
Contract.Requires(source != null); Contract.Requires(count >= 0); return source.Publish(published => from x in published from buffer in published.StartWith(x).BufferWithCount(count).Take(1) where buffer.Count == count select buffer);
}
This is cool, but I am concerned that we’re returning a new list for each sample, even though we’re only interested in two values, this could be hammering the garbage collector for high rates of data. It does say in section 6.1 of the RX Design Guidelines that new operators should be composed of existing operators, unless performance is a concern – this is something I may get around to investigating in a future blog post.
Labels: Reactive Extensions for .NET, RX