Make Async Your Buddy With Reactive Extensions

code, rx comments edit

For a long time, good folks like Matt Podwysocki have extolled the virtues of Reactive Extensions (aka Rx) to me. It piqued my interest enough for me to write a post about it, but that was the extent of it. It sounded interesting, but it didn’t have any relevance to any projects I had at the time.

Fortunately, now that I work at GitHub I have the pleasure to work with an Rx Guru, Paul Betts, on a project that actively uses Rx. And man, is my mind blown by Rx.

Hits Me Like A Hurricane

What really blew me away about Rx is how it allows you to handle complex async interactions declaratively. No need to chain callbacks together or worry about race conditions. With Rx, you can easily compose multiple async operations together. It’s powerful.

The way I describe it to folks is to think of how the IEnumerable and IEnumerator are involved when iterating over an enumerable. Now take those and reverse the polarity. That’s Rx. But with Rx, the IObservable and IObserver interfaces are involved and rather than enumerate over existing sequences, you write queries against sequences of future events.

Hear that? That’s the sound of my head asploding again.

the-future

Rx has a tendency to twist and contort the mind in strange ways. But it’s really not all that complicated. It only hurts the head at first because it’s a new way to think about async, sequences, and queryies for many folks.

Here’s a simple example that helps demonstrate the power of Rx. Say you’re writing a client app (such as a WPF application) and want to save the application to persist its window’s position and size. That way, the next time the app starts, the position is restored.

How you save the position isn’t so important, but if you’re curious, I found this post, Saving window size and location in WPF and WinForms, helpful.

I modified it in two ways for my needs. First, I replaced the Settings object with an asynchronous cache as the storage for the placement info.

I then changed it to save the placement info when the window is resized, rather than when the application exits. That way, if the app crashes, it won’t forget its last position.

Handling Resize Events

So let’s think about this a bit. When you resize a window, the resize event might be fired a large number of times. We probably don’t want to save the position on every one of those calls. It’s not just a performance problem, but it could be a data corruption problem if I’m using an async method to save the placement. It might be possible for a later call to occur before an earlier call when so many happen so close together.

What we really want to do is save the setting when there’s a pause during a resize operation. For example, a user starts to resize the window, then stops. Five seconds later, if there’s been no other resize operation, only then do we save the setting.

How would you do this with traditional code? You could probably figure it out, ut it’d be ugly. Perhaps have the resize event start a timer for five seconds, if it isn’t started already. Each subsequent event would reset the timer. When the timer finishes, it saves the setting and turns itself off. The code is going to be a bit gnarly and all over the place.

Here’s what it looks like with Rx.

Observable.FromEventPattern<SizeChangedEventHandler, SizeChangedEventArgs>
    (h => SizeChanged += h, h => SizeChanged -= h)
    .Throttle(TimeSpan.FromSeconds(5), RxApp.DeferredScheduler)
    .Subscribe(_ => this.SavePlacement());

That’s it! Nice and self contained in a single expression.

Let’s break it down a bit.

Observable.FromEventPattern<SizeChangedEventHandler, SizeChangedEventArgs>
    (h => SizeChanged += h, h => SizeChanged -= h)

This first part of the expression converts the SizeChangedEvent into an observable. The specific type of this observable is IObservable<EventPattern<SizeChangedEventArgs>>. This is analogous to an IEnumerable<EventPattern<SizeChangedEventArgs>>, but with its polarity reversed. Having an observable will allow us to subscribe to a stream of size changed events. But first:

.Throttle(TimeSpan.FromSeconds(5), RxApp.DeferredScheduler)

This next part of the expression uses the Throttle method to throttle the sequence of events coming from the observable. It will ignore events in the sequence if a newer one arrives within the specified time span. In other words, this observable won’t return any item until there’s a five second lull in events.

The RxApp.DeferredScheduler comes from the ReactiveUI framework and is equivalent to new DispatcherScheduler(Application.Current.Dispatcher). It indicates which scheduler to run the throttle timers on. In this case, we indicate the dispatcher scheduler which runs the throttle timer on the UI thread.

.Subscribe(_ => this.SavePlacement());

And we end with the Subscribe call. This method takes in an Action to run for each item in the observable sequence when it arrives. This is where we do the work to actually save the window placement.

Putting it all together, every time a resize event is succeeded by a five second lull, we save the placement of the window.

But wait, compose more

Ok, that’s pretty cool. But to write imperative code to do this would be slightly ugly and not all that hard. Ok, let’s up the stakes a bit, shall we?

We forgot something. You don’t just want to save the placement of the window when it’s resized. You also want to save it when it’s moved.

So we really need to observe two sequences of events, but still throttle both of them as if they were one sequence. In other words, when either a resize or move event occurs, the timer is restarted. And only when five seconds have passed since either event has occurred, do we save the window placement.

The traditional way to code this is going to be very ugly.

This is where Rx shines. Rx provides ways to compose observables in very interesting ways. In this case we’ll deal with two observables, the one we already created that handles SizeChanged events, and a new one that handles LocationChanged events.

Here’s the code for the LocationChanged observable. I’ll save the observable into an intermediate variable for clarity. It’s exactly what you’d expect.

var locationChanges = Observable.FromEventPattern<EventHandler, EventArgs>
  (h => LocationChanged += h, h => LocationChanged -= h);

I’ll do the same for the SizeChanged event.

var sizeChanges = Observable.FromEventPattern
    <SizeChangedEventHandler, SizeChangedEventArgs>
    (h => SizeChanged += h, h => SizeChanged -= h);

We can use the Observable.Merge method to merge these sequences into a single sequence. But going back to the IEnumerable analogy, these are both sequences of different types. If you had two enumerables of different types and wanted to combine them into a single enumerable, what would you do? You’d apply a transformation with the Select method! And that’s what we do here too.

Since I don’t care what the event arguments are, just when they arrive, I’ll transform each sequence into an IObservable<Unit.Default> by calling Select(_ => Unit.Default) on each observable. Unit is an Rx type that indicates there’s no information. It’s like returning void.

var merged = Observable.Merge(
    sizeChanges.Select(_ => Unit.Default), 
    locationChanges.Select(_ => Unit.Default)
);

I’ll then call Observable.Merge to merge the two sequences together into a single sequence of event args.

Now, with this combined sequence, I can simply apply the same throttle and subscription I did before.

merged
    .Throttle(TimeSpan.FromSeconds(5), RxApp.DeferredScheduler)
    .Subscribe(_ => this.SavePlacement());

Think about that for a second. I was able to compose various sequences of events and into a single observable and I didn’t have to change the code to throttle the events or to subscribe to them.

As you get more familiar with Rx, it starts to get easier to read the code and you tend to use less intermediate variables. Here’s the full more idiomatic expression.

Observable.Merge(
    Observable.FromEventPattern<SizeChangedEventHandler, SizeChangedEventArgs>
        (h => SizeChanged += h, h => SizeChanged -= h)
        .Select(e => Unit.Default),
    Observable.FromEventPattern<EventHandler, EventArgs>
        (h => LocationChanged += h, h => LocationChanged -= h)
        .Select(e => Unit.Default)
).Throttle(TimeSpan.FromSeconds(5), RxApp.DeferredScheduler)
.Subscribe(_ => this.SavePlacement());

That single declarative expression handles so much crazy logic. Very powerful stuff.

Even if you don’t write WPF apps, there’s still probably something useful here for you. This same powerful approach is also available for JavaScript.

See it in action

I put together a really rough sample app that demonstrates this concept. It’s not using the async cache, but it is using Rx to throttle resize and move events and then save the placement of the window after five seconds.

Just grab the WindowPlacementRxDemo project from my CodeHaacks GitHub repository.

More Info

For more info on Reactive Extensions, I recommend the following:

Tags: Rx, Reactive-Extensions, RxUI, Reactive-UI, WPF

Comments