Make Async Your Buddy With Reactive Extensions

code, rx 0 comments suggest edit

For a long time, good folks like Matt Podwysocki have extolled the virtues of Reactive Extensions (aka Rx) to me. It piqued my interest enough for me to write a post about it, but that was the extent of it. It sounded interesting, but it didn’t have any relevance to any projects I had at the time.

Fortunately, now that I work at GitHub I have the pleasure to work with an Rx Guru, Paul Betts, on a project that actively uses Rx. And man, is my mind blown by Rx.

Hits Me Like A Hurricane

What really blew me away about Rx is how it allows you to handle complex async interactions declaratively. No need to chain callbacks together or worry about race conditions. With Rx, you can easily compose multiple async operations together. It’s powerful.

The way I describe it to folks is to think of how the IEnumerable and IEnumerator are involved when iterating over an enumerable. Now take those and reverse the polarity. That’s Rx. But with Rx, the IObservable and IObserver interfaces are involved and rather than enumerate over existing sequences, you write queries against sequences of future events.

Hear that? That’s the sound of my head asploding again.

the-future

Rx has a tendency to twist and contort the mind in strange ways. But it’s really not all that complicated. It only hurts the head at first because it’s a new way to think about async, sequences, and queryies for many folks.

Here’s a simple example that helps demonstrate the power of Rx. Say you’re writing a client app (such as a WPF application) and want to save the application to persist its window’s position and size. That way, the next time the app starts, the position is restored.

How you save the position isn’t so important, but if you’re curious, I found this post, Saving window size and location in WPF and WinForms, helpful.

I modified it in two ways for my needs. First, I replaced the Settings object with an asynchronous cache as the storage for the placement info.

I then changed it to save the placement info when the window is resized, rather than when the application exits. That way, if the app crashes, it won’t forget its last position.

Handling Resize Events

So let’s think about this a bit. When you resize a window, the resize event might be fired a large number of times. We probably don’t want to save the position on every one of those calls. It’s not just a performance problem, but it could be a data corruption problem if I’m using an async method to save the placement. It might be possible for a later call to occur before an earlier call when so many happen so close together.

What we really want to do is save the setting when there’s a pause during a resize operation. For example, a user starts to resize the window, then stops. Five seconds later, if there’s been no other resize operation, only then do we save the setting.

How would you do this with traditional code? You could probably figure it out, ut it’d be ugly. Perhaps have the resize event start a timer for five seconds, if it isn’t started already. Each subsequent event would reset the timer. When the timer finishes, it saves the setting and turns itself off. The code is going to be a bit gnarly and all over the place.

Here’s what it looks like with Rx.

Observable.FromEventPattern<SizeChangedEventHandler, SizeChangedEventArgs>
    (h => SizeChanged += h, h => SizeChanged -= h)
    .Throttle(TimeSpan.FromSeconds(5), RxApp.DeferredScheduler)
    .Subscribe(_ => this.SavePlacement());

That’s it! Nice and self contained in a single expression.

Let’s break it down a bit.

Observable.FromEventPattern<SizeChangedEventHandler, SizeChangedEventArgs>
    (h => SizeChanged += h, h => SizeChanged -= h)

This first part of the expression converts the SizeChangedEvent into an observable. The specific type of this observable is IObservable<EventPattern<SizeChangedEventArgs>>. This is analogous to an IEnumerable<EventPattern<SizeChangedEventArgs>>, but with its polarity reversed. Having an observable will allow us to subscribe to a stream of size changed events. But first:

.Throttle(TimeSpan.FromSeconds(5), RxApp.DeferredScheduler)

This next part of the expression uses the Throttle method to throttle the sequence of events coming from the observable. It will ignore events in the sequence if a newer one arrives within the specified time span. In other words, this observable won’t return any item until there’s a five second lull in events.

The RxApp.DeferredScheduler comes from the ReactiveUI framework and is equivalent to new DispatcherScheduler(Application.Current.Dispatcher). It indicates which scheduler to run the throttle timers on. In this case, we indicate the dispatcher scheduler which runs the throttle timer on the UI thread.

.Subscribe(_ => this.SavePlacement());

And we end with the Subscribe call. This method takes in an Action to run for each item in the observable sequence when it arrives. This is where we do the work to actually save the window placement.

Putting it all together, every time a resize event is succeeded by a five second lull, we save the placement of the window.

But wait, compose more

Ok, that’s pretty cool. But to write imperative code to do this would be slightly ugly and not all that hard. Ok, let’s up the stakes a bit, shall we?

We forgot something. You don’t just want to save the placement of the window when it’s resized. You also want to save it when it’s moved.

So we really need to observe two sequences of events, but still throttle both of them as if they were one sequence. In other words, when either a resize or move event occurs, the timer is restarted. And only when five seconds have passed since either event has occurred, do we save the window placement.

The traditional way to code this is going to be very ugly.

This is where Rx shines. Rx provides ways to compose observables in very interesting ways. In this case we’ll deal with two observables, the one we already created that handles SizeChanged events, and a new one that handles LocationChanged events.

Here’s the code for the LocationChanged observable. I’ll save the observable into an intermediate variable for clarity. It’s exactly what you’d expect.

var locationChanges = Observable.FromEventPattern<EventHandler, EventArgs>
  (h => LocationChanged += h, h => LocationChanged -= h);

I’ll do the same for the SizeChanged event.

var sizeChanges = Observable.FromEventPattern
    <SizeChangedEventHandler, SizeChangedEventArgs>
    (h => SizeChanged += h, h => SizeChanged -= h);

We can use the Observable.Merge method to merge these sequences into a single sequence. But going back to the IEnumerable analogy, these are both sequences of different types. If you had two enumerables of different types and wanted to combine them into a single enumerable, what would you do? You’d apply a transformation with the Select method! And that’s what we do here too.

Since I don’t care what the event arguments are, just when they arrive, I’ll transform each sequence into an IObservable<Unit.Default> by calling Select(_ => Unit.Default) on each observable. Unit is an Rx type that indicates there’s no information. It’s like returning void.

var merged = Observable.Merge(
    sizeChanges.Select(_ => Unit.Default), 
    locationChanges.Select(_ => Unit.Default)
);

I’ll then call Observable.Merge to merge the two sequences together into a single sequence of event args.

Now, with this combined sequence, I can simply apply the same throttle and subscription I did before.

merged
    .Throttle(TimeSpan.FromSeconds(5), RxApp.DeferredScheduler)
    .Subscribe(_ => this.SavePlacement());

Think about that for a second. I was able to compose various sequences of events and into a single observable and I didn’t have to change the code to throttle the events or to subscribe to them.

As you get more familiar with Rx, it starts to get easier to read the code and you tend to use less intermediate variables. Here’s the full more idiomatic expression.

Observable.Merge(
    Observable.FromEventPattern<SizeChangedEventHandler, SizeChangedEventArgs>
        (h => SizeChanged += h, h => SizeChanged -= h)
        .Select(e => Unit.Default),
    Observable.FromEventPattern<EventHandler, EventArgs>
        (h => LocationChanged += h, h => LocationChanged -= h)
        .Select(e => Unit.Default)
).Throttle(TimeSpan.FromSeconds(5), RxApp.DeferredScheduler)
.Subscribe(_ => this.SavePlacement());

That single declarative expression handles so much crazy logic. Very powerful stuff.

Even if you don’t write WPF apps, there’s still probably something useful here for you. This same powerful approach is also available for JavaScript.

See it in action

I put together a really rough sample app that demonstrates this concept. It’s not using the async cache, but it is using Rx to throttle resize and move events and then save the placement of the window after five seconds.

Just grab the WindowPlacementRxDemo project from my CodeHaacks GitHub repository.

More Info

For more info on Reactive Extensions, I recommend the following:

Tags: Rx, Reactive-Extensions, RxUI, Reactive-UI, WPF

Found a typo or error? Suggest an edit! If accepted, your contribution is listed automatically here.

Comments

avatar

19 responses

  1. Avatar for Steve
    Steve April 9th, 2012

    I just wish it had a syntax that wasn't so hard to understand.

  2. Avatar for Johan Lindfors
    Johan Lindfors April 9th, 2012

    Great post Phil, informative and easy to understand.

  3. Avatar for Omer Mor
    Omer Mor April 9th, 2012

    Welcome aboard the Rx train Phil!
    Personally I'd write your last query a bit differently (matter of taste):
    I'd select a Unit.Default from the events (instead of EventArgs): Unit is an Rx type to indicate no information. Kinda like the equivalent of the System.Void type, but unlike void it can be passed as an argument. I'd use Unit because that's all the information I need at that point.

  4. Avatar for Frank Quednau
    Frank Quednau April 9th, 2012

    Thanks, you just gave me a new idea to let membus not only be a source of observables but to have it consume observables as a source of messages. Inspirational :)

  5. Avatar for Steve W
    Steve W April 9th, 2012

    I used Rx on a Silverlight / WCF project a while ago and it really simplified the client side service aggregation code and various other bits in the UI - it's a really nice way to write async code :)

  6. Avatar for James Hughes
    James Hughes April 9th, 2012

    I've been working on a Scala project recently in my job and there is a new thing in the Scala world called Iteratees which are essentially what Rx is all about. Odd thing is the Scala world appear to be lapping them up - yet I feel the .NET world is still keeping Rx on the side lines. Probably because of the more functional nature of Scala over C# they feel more natural to the developer. That said Rx does appear to be gaining some steady momentum recently - hopefully it will continue.

  7. Avatar for haacked
    haacked April 9th, 2012

    @Omer yeah, I use Unit.Default all the time. I think I thought I would need some info but later realized I wouldn't. Great suggestion. I updated the blog post.

  8. Avatar for Ben Reich
    Ben Reich April 9th, 2012

    After the first time my head stopped asploding from Rx, I combined it with the much underappreciated Sql Server QueryNotifications to build Observables, and whaddya know, yet more asploding!
    Building a cloud app becomes so much easier when each component can be built as an Observable.

  9. Avatar for Derek
    Derek April 9th, 2012

    Have you been hanging out with that Betts character?

  10. Avatar for Dave
    Dave April 9th, 2012

    Your post does a great job of highlighting one of the primary features of Rx: coordination of events. I believe it's very important for devs that are new to Rx to understand the benefits of LINQ compared to imperative-style code.
    Though it's equally as important to understand that Rx goes a step farther by introducing the concepts of asynchrony into LINQ. Your query can be improved by considering asynchrony. First, realize that sequences can (and should) contain data. The events that indicate when a window resizes or moves are raised on the UI thread, so perhaps it would be best to change the semantics of the query right away; i.e., project the events via the Select operator into a sequence of window positions: IObservable<Rect>. That way you can keep Throttle entirely asynchronous, without passing in the DispatcherScheduler. Think about it this way: Why force the remainder of the query back onto the UI thread simply to grab a Rect? Furthermore, if the remainder of the query (e.g., SavePlacement) is going to do file I/O, then you wouldn't want it to block the UI thread. So of course you make it save asynchronously. But now you'll probably want to bring it back into the world of the monad before doing anything else asynchronous, because that's where async coordination is thread-safe; e.g., to prevent older data from overwriting newer data (though I understand that the interval for throttle is quite large to avoid this race condition, but the reason is not obvious from the semantics of the query, which is bad for future maintenance.)
    This way introduces concurrency by executing SavePlacement on a pooled thread, but it ensures that no older data will ever overwrite previous data.
    var sizeChanges =
    from e in Observable.FromEventPattern<SizeChangedEventHandler, SizeChangedEventArgs>(h => SizeChanged += h, h => SizeChanged -= h)
    select new Rect(Left, Top, Width, Height);
    var locationChanges =
    from e in Observable.FromEventPattern<EventHandler, EventArgs>(h => LocationChanged += h, h => LocationChanged -= h)
    select new Rect(Left, Top, Width, Height);
    sizeChanges.Merge(locationChanges)
    .Throttle(TimeSpan.FromMilliseconds(250))
    .Subscribe(SavePlacement);
    ...
    void SavePlacement(Rect windowPlacement) // not an async method
    Alternatively, if you don't like the "extra" concurrency that Throttle introduces, then you should include the file I/O in the query itself:
    sizeChanges.Merge(locationChanges)
    .Throttle(TimeSpan.FromMilliseconds(250), DispatcherScheduler.Instance)
    .Select(rect => Observable.FromAsync(() => SavePlacement(rect))); // Rx 2.0 Beta
    .Concat()// ensures that older data doesn't overwrite newer data
    .Subscribe();// execute the query for its side-effects
    ...
    async Task SavePlacement(Rect windowPlacement) // C# 5

  11. Avatar for Dave
    Dave April 9th, 2012

    Correction:
    "This way introduces concurrency by executing SavePlacement on a pooled thread, but it ensures that no older data will ever overwrite previous data."
    - should have been -
    "This way introduces concurrency by executing SavePlacement on a pooled thread, but it ensures that no older data will overwrite newer data."

  12. Avatar for Justin
    Justin April 9th, 2012

    I think I understand the basic idea but will need put in a few hours playing with it.

  13. Avatar for Giorgi
    Giorgi April 10th, 2012

    Excellent post Phil it demonstrates power of RX in a simple and understandable way.
    Shameless plug: I also wrote a post about RX recently to show how it can simplify complex code and make it more readable. The post is available at www.aboutmycode.com/... By the way, Paul also had a look at it ;)

  14. Avatar for Greg
    Greg April 10th, 2012

    Built in Throttle will be nice. Have a good chunk of (well encapsulated) code to deal with this now.

  15. Avatar for Chris Sells
    Chris Sells April 10th, 2012

    Why are all the RX samples I've seen of UI stuff? Is that the only use?

  16. Avatar for Giorgi
    Giorgi April 10th, 2012

    Chris,
    I'm just a beginner on rx so I may be mistaken but rx makes processing event streams very easy and UI is a great source for event streams.

  17. Avatar for RichB
    RichB October 23rd, 2012

    Wow - this throttling works beautifully on WinForms with ControlScheduler instead of DispatcherScheduler. Love it.

  18. Avatar for Порталы каминов
    Порталы каминов December 6th, 2012

    Have you been hanging out with that Betts character?

  19. Avatar for peter
    peter March 8th, 2015

    Brilliant! I've followed your blog posts for a while, and they've been great. But only now (with a non-technical post nonetheless) have I been compelled to write a comment. These non-technical posts are what really hits home the most.