Writing a ContinueAfter method for Rx

code, rx 0 comments suggest edit

With Reactive Extensions you sometimes need one observable sequence to run after another observable sequence completes. Perhaps the first one has side effects the second one depends on. Egads! I know, side effects are evil in this functional world, but it happens.

Let’s make this more concrete with some contrived sample code.

public static class BlogDemo
{
  public static IObservable<int> Extract()
  {
    return new[] { 10, 20, 70, 100 }.ToObservable();
  }

  public static IObservable<string> GetStuffs()
  {
    return new[] { "Family Guy", "Cheetos", "Rainbows" }.ToObservable();
  }
}

Here we have a class with two methods. The first method extracts something and returns a sequence of progress indicators. The second method returns an observable sequence of strings (good stuffs I hope). With me so far?

Now suppose I need to write a third method, let’s call it GetStuffsAfterExtract. This method needs to run Extract , and only when that is complete, return the sequence from GetStuffs. How would we approach this?

Well, in the Task based world, Extract would probably return a Task<T> instead of a observable. Task<T> represents a single future value as opposed to a sequence of future values. If we did that, we could use the ContinueWith method (or simply use the await keyword it in .NET 4.5).

But I don’t live in that world. I live in shiny happy RxLand where we always deal with sequences. Future sequences! It’s an awesome zany world.

Note that this method I want to write doesn’t care about the actual sequence of values from Extract. All I care to know is when it’s complete and then it will return the sequence of values from GetStuff.

Here’s one way to do it with Observable.Start:

public static IObservable<string> GetStuffsAfterExtract()
{
    return Observable.Start(() =>
    {
        Extract().Wait(); 
        return GetStuffs();
    }).Merge();
}

This works, but it’s not optimal. The use of Observable.Start guarantees we’ll have a context switch to the TaskPool rather than doing the operation on the current thread.

Also, it’s ugly.

Let’s try again:

public static IObservable<string> GetStuffsAfterExtract()
{
  return Extract().TakeLast(1).SelectMany(_ => GetStuffs());
}

A little better. This works pretty well in most situations. If you’re wondering what the underscore character is in the SelectMany statement, that’s the name for lambda parameters I use to indicate that the parameter is not needed. It’s a convention I learned from someone a while back on the ASP.NET team. It makes my intention to not use it in the expression clear..

But what happens if there’s a case where Extract legitimately returns an empty observable, aka Observable.Empty<int>(). In this case, I could just change it to not do that since I wrote it. But maybe I’m calling a method written by someone else and we don’t trust that person to do everything perfect like me.

Well GetStuffs will never get called because SelectMany projects each element of the second sequence onto the first. If there are no elements in the first, there’s nothing for it to do. Hey, maybe that’s exactly what you want!

But that’s not what I want in this case. So with the help of my co-worker Paul “Rx Master of Disaster” Betts, we went back and forth through several iterations.

I figured the first step is to simply write a method that represents the completion of an observable sequence whether it’s empty or not. I’ll call this method AsCompletion and it’ll return a new sequence with a single Unit.Default when the original sequence is complete.  It turns out that the Aggregate method is great for this (just like in standard Linq!):

public static IObservable<Unit> AsCompletion<T>(this IObservable<T> observable)
{
  return observable.Aggregate(Unit.Default, (accumulation, _) => accumulation);
}

Aggregate is typically used to aggregate a sequence into a single value. It’s also known as an accumulation function. But in this case, I don’t care about any of the individual values. I simply keep returning the accumulation unchanged.

The first parameter to Aggregate is a seed value. Since I seeded that accumulation with a Unit.Default, it’ll keep returning that same value.

In other words, this method will return a sequence of exactly one Unit.Default when the sequence it’s called upon is complete whether it’s empty or not. Cool.

Now I can use this to build my ContinueAfter method (I didn’t name it ContinueWith because we don’t actually do anything with the previous values and I want to make sure it’s clear we’re talking about doing work after the sequence is complete and not as things come in).

public static IObservable<TRet> ContinueAfter<T, TRet>(
  this IObservable<T> observable, Func<IObservable<TRet>> continuation)
{
  return observable.AsCompletion().SelectMany(_ => continuation());
}

You’ll notice that the body of this method looks pretty similar to my first attempt, but instead of TakeLast(1) I’m just using the AsCompletion method.

With this method in place, I can rewrite the code I set out to write as:

public static IObservable<string> GetStuffsAfterExtract()
{
    return Extract().ContinueAfter(GetStuffs);
}

That is much more lickable code. One nice thing I like about this method is it takes in a parameterless Func. That makes it very clear that it won’t pass in a value to your expression that would then want to ignore and in this case allows me to pass in a method group.

I write this full well knowing someone who’s a much better Rx master than myself will point out an even better approach. I welcome it! For now, this is working pretty well for me.

Oh, I almost forgot. I posted the unit tests I have so far for this method as a gist.

UPDATE 10/9/2012 Just as I expected, folks chimed in with better ideas. Some asked why didn’t I just use Concat since it’s perfect for this. The funny thing is I did think about using it, but I dismissed it because it requires both sequences to be of the same type, as someone pointed out in my comments.

But then it occurred to me, I’m using Rx. I can transform sequences! So here’s my new ContinueAfter implementation.

public static IObservable<TRet> ContinueAfter<T, TRet>(
  this IObservable<T> observable
  , Func<IObservable<TRet>> continuation)
{
  return observable.Select(_ => default(TRet))
    .IgnoreElements()
    .Concat(continuation());
}

I also updated the AsCompletion method since I use that in other places.

public static IObservable<Unit> AsCompletion<T>(this IObservable<T> observable)
{
    return observable.Select(_ => Unit.Default)
        .IgnoreElements()
        .Concat(Observable.Return(Unit.Default));
}

Please note that I didn’t have to change ContinueAfter. I could have just changed AsCompletion and I would have been ok. I just changed it here to show I could have written this cleanly with existing Rx operators. Also, and I should test this later, it’s probably more efficient to have one Concat call than two.

I added another unit test to the gist I mentioned that makes sure that the second observable doesn’t run if the first one has an exception. If you still want it to run, you can catch the exception and do the right thing.

UPDATE 10/10/212 Ok, after some real world testing, I’m finding issues with the Concat approach. Another commenter, Benjamin, came forward with the most straightforward approach. It’s one I originally had, to be honest, but wanted to try it in a more “functional” approach. But what I’m doing is definitely not functional as I’m dealing with side effects.

Here’s my final (hopefully) implementation.

public static IObservable<Unit> AsCompletion<T>(this IObservable<T> observable)
{
  return Observable.Create<Unit>(observer =>
  {
    Action onCompleted = () =>
    {
      observer.OnNext(Unit.Default);
      observer.OnCompleted();
    };
    return observable.Subscribe(_ => {}, observer.OnError, onCompleted);
  });
}

public static IObservable<TRet> ContinueAfter<T, TRet>(
  this IObservable<T> observable, Func<IObservable<TRet>> selector)
{
  return observable.AsCompletion().SelectMany(_ => selector());
}
Found a typo or error? Suggest an edit! If accepted, your contribution is listed automatically here.

Comments

avatar

21 responses

  1. Avatar for Caleb
    Caleb October 8th, 2012

    I think I would implement it using Materialize() as shown in this gist: https://gist.github.com/3856008
    I think it makes things a little clearer, although it may perform more poorly due to the need to create notification objects.

  2. Avatar for Benjamin
    Benjamin October 8th, 2012

    I think I would have used Materialize too.
    Don't know if it's more performant though.

  3. Avatar for Benjamin
    Benjamin October 9th, 2012

    And OnErrorResumeNext might be a possibility if you want to run the second observable either it succeeded or failed.

  4. Avatar for Jerome Laban
    Jerome Laban October 9th, 2012

    The fact that you need to chain two methods like this indicates that you have a state somewhere, and that you're not using the output of the first observable to feed the second. You should probably need to watch out for concurrent subscriptions that may corrupt your state.
    If you need to mimic the single-value return of a method, you could probably be using this :
    public static IObservable<string[]> Result()
    {
    Extract()
    .ToArray()
    .Select(stuff => GetStuff(stuff).ToArray());
    }
    You could still do a SelectMany instead of Select and ToArray, if you really need to have your results provided as a stream instead of a single result.
    But if you really can't do it any other way, I suggest you to use the Observable.IgnoreElements() follows by a Select(Unit.Default), instead of using Agggregate, which executes a lambda unnecessarily in this case.
    @Caleb: This gist is ignoring exceptions from the source observable. The couple of Materialize/Where is not going to forward the OnError notification downstream, which can be hiding some pretty nasty bugs. You should probably use Notification.CreateOnNext<>() and Notification.CreateOnError<>() followed by Dematerialize, instead of projecting to the result type with a simple Observable.Select.

  5. Avatar for Jerome Laban
    Jerome Laban October 9th, 2012

    Update for a little mistake, it is :
    public static IObservable<string[]> Result()
    {
    Extract()
    .ToArray()
    .SelectMany(stuff => GetStuff(stuff).ToArray());
    }
    And not just Select. SelectMany is needed to unwrap the Observable.ToArray().

  6. Avatar for Ken Egozi
    Ken Egozi October 9th, 2012

    nit - using underscore for a "whatever" parameter name probably comes from prolog (or a similar language) where it denotes an 'Anonymous variable' - a variable that is never actually bound to a value, thus perform as 'whatever'.

  7. Avatar for gandjustas
    gandjustas October 9th, 2012

    Why don't just use Concat?

  8. Avatar for Ray Booysen
    Ray Booysen October 9th, 2012

    Agreed, Concat is made for this. If your Extract() stream errors, you probably don't want to do the GetStuffs() anyway as you're in an unknown state and would handle that in your OnError handler. Concat however, fulfills the requirement. After the first stream, start the next.

  9. Avatar for Frank Quednau
    Frank Quednau October 9th, 2012

    Wouldn't you need the two observables to be of the same type in the case of Concat?
    Also, since you can push exceptions to Observer, I am sure you can write a version that will not start pushing out elements if the previous Observable failed.

  10. Avatar for haacked
    haacked October 9th, 2012

    @Frank yeah, but you could transform the first sequence like so:

    public static IObservable<tret> ContinueAfter<t, tret="">(
    this IObservable<t> observable, Func<iobservable<tret>> selector)
    {
    return observable.Select(_ => default(TRet))
    .IgnoreElements()
    .Concat(selector());
    }
  11. Avatar for haacked
    haacked October 9th, 2012

    @Jerome I'm not too concerned about Aggregate calling a lambda. After all, in your solution you suggest calling ToArray. The implementation of that calls ToList. The implementation of that calls Aggregate. So we're back to square one! :)

  12. Avatar for haacked
    haacked October 9th, 2012

    @Caleb I added a new unit test to my gist that should have been there from the beginning. The Materialize approach does indeed fail this test. The Concat method does indeed work and passes this test.

  13. Avatar for Paul Betts
    Paul Betts October 9th, 2012

    This is a great conversation! One of the cool things about Rx is that there are different ways to do things, but they all make you think about the underlying nature of the result - you can totally see that in this thread.

  14. Avatar for Jerome Laban
    Jerome Laban October 9th, 2012

    @haacked Actually, the solution I suggested that calls ToArray() is more about *not* storing an intermediate external state between the two observable subscriptions. This means that the output of the first is really provided to the second, and you're more inclined to have a stable computation result, no matter how many subscriptions to the observable chain are active at the same time.
    This looks to me as a similar debate to the reason why there is no IEnumerable.ForEach() in the BCL. It implicitly means that you introduce a side effect (an external state in this case), which I've found not to be a good idea in a query subscription/execution.

  15. Avatar for Matt Ellis
    Matt Ellis October 9th, 2012

    Great discussion. Paul's right, half the fun of Rx is coming up with different ways of doing things - I sometimes think it's more of an art than a science.
    Which means of course, I have an opinion! Personally, I don't like that you're concatenating two unrelated streams (and that transformation is only there to satisfy the type system, which is a bit of a smell). You're not interested in the values of the first stream, just doing something when it finishes. There's already a construct for that - the onCompleted action passed to Subscribe.
    And I know it's an example scenario, but it also seems a little odd to be driving this from the progress indicators of an extract method. My problem with this is that this isn't the main process, but a notification - it's only purpose is to provide progress information. If you want to do anything with the data you've extracted, you need to refer back to the original object that produced the first stream, which means you've got state. I'd suggest it would be better to have a completed event on this original object, or a stream that exposed the data you're extracting so you can genuinely act on and transform that data.
    Chaining streams like this just seems so, well, *procedural*.

  16. Avatar for Jerome Laban
    Jerome Laban October 9th, 2012

    @matt It definitely feels procedural :) This happens when Rx is just added on top of synchronous code.
    This happened when I started using Rx, then I started going functional. It helped a lot !

  17. Avatar for Benjamin
    Benjamin October 9th, 2012

    And what about this one?
    public static IObservable<TRet> ContinueAfter<T, TRet>(this IObservable<T> observable, Func<IObservable<TRet>> selector)
    {
    return Observable.Create<Unit>(observer =>
    {
    Action onCompleted = () =>
    {
    observer.OnNext(Unit.Default);
    observer.OnCompleted();
    };
    return observable.Subscribe(_ => { }, err => observer.OnError(err), onCompleted);
    })
    .SelectMany(_ => selector());
    }

  18. Avatar for haacked
    haacked October 10th, 2012

    @Benjamin that seems to be the most straightforward and correct solution compared to mine. I found a problem with using Concat. Not sure why yet, but it fails for me. :(

  19. Avatar for Jordon
    Jordon October 18th, 2012

    I am Sorry to ask this here, but you are the best person to answer this question, I will appreciate if you can help me with this. THank you.
    I have been hearing from people that asp.net mvc has full control over traditional asp.net webforms and that is best reason to do programming in asp.net mvc.

    Based on this my Question
    1) I believe Ado.Net has more/full control over ORM then what is the reason of using ORM, when we believe in having full control of what we are doing...
    2) One obvious advantage i can see with ORM is we can switch database (Eg: Oracle to SQL Server) without changing anything in code, but argument is this is big decision and I don't think anyone keep changing their databases so frequently than I don't think it is truly an advantage.
    3) RAD development with ORM, but since we want full control it is worth spending time on doing things manually and with defined strategy of doing this we can develop things faster.
    4) I have seen in past that Microsoft keep on Recommending so many things and after sometime it comes from microsoft itself that, that is not recommended to use and you should use something else... Is it Microsoft Sales strategy.
    Asp.net MVC vs Asp.net Web Form
    1) With release of .Net 4.0 we can do almost all the things we can do in asp.net mvc and on top of it we have advantage of RAD when we are doing it with asp.net web forms than what is fun of using asp.net mvc architecture.
    - For ViewState, we can turn it off
    - For Having full control on HTML, we can generate HTML by dynamically creating controls (By this method we can achieve full control on HTML)
    - We have clean seperation of UI vs Code, since in Web Forms we are not using any Code logic
    - We have Routing for web forms too so that we can have SEO Friendly URL
    - I have also seen that performance is little degraded with asp.net mvc then doing coding with asp.net web forms.
    - MVC is just architecture which we can also do in Web Forms
    - We can also be TDD compliant with Web Forms
    - Almost all things which asp.net mvc have + advantage of RAD and simplicity of web forms than what is reason on switching to asp.net mvc?
    I truly don't have any convincing answer which can please me towards this new technology... I have seen lot of people run around the buzz which Microsoft creates, but i truly don't see any added value in this...
    To all the experts, could you please point some light and help me in understanding value of investing time and money in this new technologies.

  20. Avatar for Dave
    Dave November 5th, 2012

    It seems to me that you could simplify your code using the IsEmpty operator.
    Admittedly, I haven't read the comments, so sorry if I'm duplicating something here or if I'm missing some requirements, though the following implementations pass your unit tests.
    Note: Defer isn't necessary, but perhaps it's more aligned with user expectations since continuation is a Func<>.
    public static IObservable<TResult> ContinueAfter<TSource, TResult>(
    this IObservable<TSource> source,
    Func<IObservable<TResult>> continuation)
    {
    return source.IsEmpty().SelectMany(Observable.Defer(continuation));
    }
    public static IObservable<Unit> AsCompletion<TSource>(this IObservable<TSource> source)
    {
    return source.IsEmpty().Select(_ => Unit.Default);
    }

  21. Avatar for KarlTiggs
    KarlTiggs February 22nd, 2013

    I think these newfangled extensions can easily turn C#, a beautiful and approachable language, into (gasp!) C++. It was hard enough to get your head around its syntax and semantics, but templates turned C++ into an absurdly difficult language. I bet a very large number of older C# programmers are former C++ programmers who never looked back.

    You always hear that these features are optional, but in reality, you always feel like an idiot for not groking it. You feel as if you're doing you're craft a disservice by not using them, which is probably why you wrestled with Rx instead of picking a quick and dirty solution. LINQ was difficult, but it's definately worth learning. Using R# helped nail it down for me.

    The problem I have the reactive extensions is that it makes you think too hard. It took me several reads before figuring out that IObservable is the producer and IObserver is the consumer. A better nomenclature such as IPublisher/ISubscriber would have made it a lot easier to figure out. IPimp/IProstitute or IPusher/IAddict would have been just as good, albeit crude.

    BTW:
    Rx is a symbol for a drug store or pharmacy. After I made the IPusher/IAddict remark, I am now wondering if the Rx is an inside joke playing off of Reactive's push mechanism? If this is true, it means Microsofties actually have a sense of humor.